rabbitmq是一个在amqp基础上完成的,可复用的企业消息系统,本文通过实例来给大家分享通过操作rabbitmq实现消息的收发,感兴趣的朋友可以参考下。
java实现ramqp,即advanced message queuing protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
amqp的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
rabbitmq是一个开源的amqp实现,服务器端用erlang语言编写,支持多种客户端,如:python、ruby、.net、java、jms、c、php、actionscript、xmpp、stomp等,支持ajax。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
本文不介绍amqp和rabbitmq相关知识,请自行网上查阅
本文是基于spring-rabbit中间件来实现消息的发送接受功能
see http://www.rabbitmq.com/tutorials/tutorial-one-java.html
see http://www.springsource.org/spring-amqp
java编程通过操作rabbitmq消息的收发实现代码如下:
<!-- for rabbitmq -->
<dependency>
<groupid>com.rabbitmq</groupid>
<artifactid>amqp-client</artifactid>
<version>2.8.2</version>
</dependency>
<dependency>
<groupid>org.springframework.amqp</groupid>
<artifactid>spring-amqp</artifactid>
<version>1.1.1.release</version>
</dependency>
<dependency>
<groupid>org.springframework.amqp</groupid>
<artifactid>spring-rabbit</artifactid>
<version>1.1.1.release</version>
</dependency>
<dependency>
<groupid>com.caucho</groupid>
<artifactid>hessian</artifactid>
<version>4.0.7</version>
</dependency>
</dependencies>
首先我们需要一个用来在app和rabbitmq之间传递消息的持有对象
public class eventmessage implements serializable{
private string queuename;
private string exchangename;
private byte[] eventdata;
public eventmessage(string queuename, string exchangename, byte[] eventdata) {
this.queuename = queuename;
this.exchangename = exchangename;
this.eventdata = eventdata;
}
public eventmessage() {
}
public string getqueuename() {
return queuename;
}
public string getexchangename() {
return exchangename;
}
public byte[] geteventdata() {
return eventdata;
}
@override
public string tostring() {
return "eopeventmessage [queuename=" + queuename + ", exchangename="
+ exchangename + ", eventdata=" + arrays.tostring(eventdata)
+ "]";
}
}
为了可以发送和接受这个消息持有对象,我们还需要需要一个用来序列化和反序列化的工厂
public interface codecfactory {
byte[] serialize(object obj) throws ioexception;
object deserialize(byte[] in) throws ioexception;
}
下面是编码解码的实现类,用了hessian来实现,大家可以自行选择序列化方式
public class hessioncodecfactory implements codecfactory {
private final logger logger = logger.getlogger(hessioncodecfactory.class);
@override
public byte[] serialize(object obj) throws ioexception {
bytearrayoutputstream baos = null;
hessianoutput output = null;
try {
baos = new bytearrayoutputstream(1024);
output = new hessianoutput(baos);
output.startcall();
output.writeobject(obj);
output.completecall();
} catch (final ioexception ex) {
throw ex;
} finally {
if (output != null) {
try {
baos.close();
} catch (final ioexception ex) {
this.logger.error("failed to close stream.", ex);
}
}
}
return baos != null ? baos.tobytearray() : null;
}
@override
public object deserialize(byte[] in) throws ioexception {
object obj = null;
bytearrayinputstream bais = null;
hessianinput input = null;
try {
bais = new bytearrayinputstream(in);
input = new hessianinput(bais);
input.startreply();
obj = input.readobject();
input.completereply();
} catch (final ioexception ex) {
throw ex;
} catch (final throwable e) {
this.logger.error("failed to decode object.", e);
} finally {
if (input != null) {
try {
bais.close();
} catch (final ioexception ex) {
this.logger.error("failed to close stream.", ex);
}
}
}
return obj;
}
}
接下来就先实现发送功能,新增一个接口专门用来实现发送功能
public interface eventtemplate {
void send(string queuename,string exchangename,object eventcontent) throws sendrefuseexception;
void send(string queuename,string exchangename,object eventcontent,codecfactory codecfactory) throws sendrefuseexception;
}
sendrefuseexception是自定义的发送失败异常类
下面是它的实现类,主要的任务就是将数据转换为eventmessage
public class defaulteventtemplate implements eventtemplate {
private static final logger logger = logger.getlogger(defaulteventtemplate.class);
private amqptemplate eventamqptemplate;
private codecfactory defaultcodecfactory;
// private defaulteventcontroller eec;
// public defaulteventtemplate(amqptemplate eopamqptemplate,
// codecfactory defaultcodecfactory, defaulteventcontroller eec) {
// this.eventamqptemplate = eopamqptemplate;
// this.defaultcodecfactory = defaultcodecfactory;
// this.eec = eec;
// }
public defaulteventtemplate(amqptemplate eopamqptemplate,codecfactory defaultcodecfactory) {
this.eventamqptemplate = eopamqptemplate;
this.defaultcodecfactory = defaultcodecfactory;
}
@override
public void send(string queuename, string exchangename, object eventcontent)
throws sendrefuseexception {
this.send(queuename, exchangename, eventcontent, defaultcodecfactory);
}
@override
public void send(string queuename, string exchangename, object eventcontent,
codecfactory codecfactory) throws sendrefuseexception {
if (stringutils.isempty(queuename) || stringutils.isempty(exchangename)) {
throw new sendrefuseexception("queuename exchangename can not be empty.");
}
// if (!eec.bebinded(exchangename, queuename))
// eec.declarebinding(exchangename, queuename);
byte[] eventcontentbytes = null;
if (codecfactory == null) {
if (eventcontent == null) {
logger.warn("find eventcontent is null,are you sure...");
} else {
throw new sendrefuseexception(
"codecfactory must not be null ,unless eventcontent is null");
}
} else {
try {
eventcontentbytes = codecfactory.serialize(eventcontent);
} catch (ioexception e) {
throw new sendrefuseexception(e);
}
}
// 构造成message
eventmessage msg = new eventmessage(queuename, exchangename,
eventcontentbytes);
try {
eventamqptemplate.convertandsend(exchangename, queuename, msg);
} catch (amqpexception e) {
logger.error("send event fail. event message : [" + eventcontent + "]", e);
throw new sendrefuseexception("send event fail", e);
}
}
}
注释的地方稍后会用到,主要是防止数据数据发送的地方没有事先声明
然后我们再实现接受消息
首先我们需要一个消费接口,所有的消费程序都实现这个类
public interface eventprocesser {
public void process(object e);
}
为了能够将不同类型的消息交由对应的程序来处理,我们还需要一个消息处理适配器
/**
* messagelisteneradapter的pojo
* <p>消息处理适配器,主要功能:</p>
* <p>1、将不同的消息类型绑定到对应的处理器并本地缓存,如将queue01+exchange01的消息统一交由a处理器来出来</p>
* <p>2、执行消息的消费分发,调用相应的处理器来消费属于它的消息</p>
*
*/
public class messageadapterhandler {
private static final logger logger = logger.getlogger(messageadapterhandler.class);
private concurrentmap<string, eventprocessorwrap> epwmap;
public messageadapterhandler() {
this.epwmap = new concurrenthashmap<string, eventprocessorwrap>();
}
public void handlemessage(eventmessage eem) {
logger.debug("receive an eventmessage: [" + eem + "]");
// 先要判断接收到的message是否是空的,在某些异常情况下,会产生空值
if (eem == null) {
logger.warn("receive an null eventmessage, it may product some errors, and processing message is canceled.");
return;
}
if (stringutils.isempty(eem.getqueuename()) || stringutils.isempty(eem.getexchangename())) {
logger.warn("the eventmessage's queuename and exchangename is empty, this is not allowed, and processing message is canceled.");
return;
}
// 解码,并交给对应的eventhandle执行
eventprocessorwrap eepw = epwmap.get(eem.getqueuename()+"|"+eem.getexchangename());
if (eepw == null) {
logger.warn("receive an eopeventmessage, but no processor can do it.");
return;
}
try {
eepw.process(eem.geteventdata());
} catch (ioexception e) {
logger.error("event content can not be deserialized, check the provided codecfactory.",e);
return;
}
}
protected void add(string queuename, string exchangename, eventprocesser processor,codecfactory codecfactory) {
if (stringutils.isempty(queuename) || stringutils.isempty(exchangename) || processor == null || codecfactory == null) {
throw new runtimeexception("queuename and exchangename can not be empty,and processor or codecfactory can not be null. ");
}
eventprocessorwrap epw = new eventprocessorwrap(codecfactory,processor);
eventprocessorwrap oldprocessorwrap = epwmap.putifabsent(queuename + "|" + exchangename, epw);
if (oldprocessorwrap != null) {
logger.warn("the processor of this queue and exchange exists, and the new one can't be add");
}
}
protected set<string> getallbinding() {
set<string> keyset = epwmap.keyset();
return keyset;
}
protected static class eventprocessorwrap {
private codecfactory codecfactory;
private eventprocesser eep;
protected eventprocessorwrap(codecfactory codecfactory,
eventprocesser eep) {
this.codecfactory = codecfactory;
this.eep = eep;
}
public void process(byte[] eventdata) throws ioexception{
object obj = codecfactory.deserialize(eventdata);
eep.process(obj);
}
}
}
这是正常情况下的消息处理方式,如果rabbitmq消息接受发生异常,也要监控到,新增一个消费类专门用来处理错误异常的消息
public class messageerrorhandler implements errorhandler{
private static final logger logger = logger.getlogger(messageerrorhandler.class);
@override
public void handleerror(throwable t) {
logger.error("rabbitmq happen a error:" + t.getmessage(), t);
}
}
接下来我们可能需要一个专门配置和rabbitmq通信的一些信息,比如地址,端口等信息
public class eventcontrolconfig {
private final static int default_port = 5672;
private final static string default_username = "guest";
private final static string default_password = "guest";
private final static int default_process_thread_num = runtime.getruntime().availableprocessors() * 2;
private static final int prefetch_size = 1;
private string serverhost ;
private int port = default_port;
private string username = default_username;
private string password = default_password;
private string virtualhost;
/**
* 和rabbitmq建立连接的超时时间
*/
private int connectiontimeout = 0;
/**
* 事件消息处理线程数,默认是 cpu核数 * 2
*/
private int eventmsgprocessnum;
/**
* 每次消费消息的预取值
*/
private int prefetchsize;
public eventcontrolconfig(string serverhost) {
this(serverhost,default_port,default_username,default_password,null,0,default_process_thread_num,default_process_thread_num,new hessioncodecfactory());
}
public eventcontrolconfig(string serverhost, int port, string username,
string password, string virtualhost, int connectiontimeout,
int eventmsgprocessnum,int prefetchsize,codecfactory defaultcodecfactory) {
this.serverhost = serverhost;
this.port = port>0?port:default_port;
this.username = username;
this.password = password;
this.virtualhost = virtualhost;
this.connectiontimeout = connectiontimeout>0?connectiontimeout:0;
this.eventmsgprocessnum = eventmsgprocessnum>0?eventmsgprocessnum:default_process_thread_num;
this.prefetchsize = prefetchsize>0?prefetchsize:prefetch_size;
}
public string getserverhost() {
return serverhost;
}
public int getport() {
return port;
}
public string getusername() {
return username;
}
public string getpassword() {
return password;
}
public string getvirtualhost() {
return virtualhost;
}
public int getconnectiontimeout() {
return connectiontimeout;
}
public int geteventmsgprocessnum() {
return eventmsgprocessnum;
}
public int getprefetchsize() {
return prefetchsize;
}
}
具体的发送、接受程序已经好了,接下来也是最重要的就是管理控制和rabbitmq的通信
public interface eventcontroller {
/**
* 控制器启动方法
*/
void start();
/**
* 获取发送模版
*/
eventtemplate geteopeventtemplate();
/**
* 绑定消费程序到对应的exchange和queue
*/
eventcontroller add(string queuename, string exchangename, eventprocesser eventprocesser);
/*in map, the key is queue name, but value is exchange name*/
eventcontroller add(map<string,string> bindings, eventprocesser eventprocesser);
}
它的实现类如下:
/**
* 和rabbitmq通信的控制器,主要负责:
* <p>1、和rabbitmq建立连接</p>
* <p>2、声明exchange和queue以及它们的绑定关系</p>
* <p>3、启动消息监听容器,并将不同消息的处理者绑定到对应的exchange和queue上</p>
* <p>4、持有消息发送模版以及所有exchange、queue和绑定关系的本地缓存</p>
* @author yangyong
*
*/
public class defaulteventcontroller implements eventcontroller {
private cachingconnectionfactory rabbitconnectionfactory;
private eventcontrolconfig config;
private rabbitadmin rabbitadmin;
private codecfactory defaultcodecfactory = new hessioncodecfactory();
private simplemessagelistenercontainer msglistenercontainer; // rabbitmq msg listener container
private messageadapterhandler msgadapterhandler = new messageadapterhandler();
private messageconverter serializermessageconverter = new serializermessageconverter(); // 直接指定
//queue cache, key is exchangename
private map<string, directexchange> exchanges = new hashmap<string,directexchange>();
//queue cache, key is queuename
private map<string, queue> queues = new hashmap<string, queue>();
//bind relation of queue to exchange cache, value is exchangename | queuename
private set<string> binded = new hashset<string>();
private eventtemplate eventtemplate; // 给app使用的event发送客户端
private atomicboolean isstarted = new atomicboolean(false);
private static defaulteventcontroller defaulteventcontroller;
public synchronized static defaulteventcontroller getinstance(eventcontrolconfig config){
if(defaulteventcontroller==null){
defaulteventcontroller = new defaulteventcontroller(config);
}
return defaulteventcontroller;
}
private defaulteventcontroller(eventcontrolconfig config){
if (config == null) {
throw new illegalargumentexception("config can not be null.");
}
this.config = config;
initrabbitconnectionfactory();
// 初始化amqpadmin
rabbitadmin = new rabbitadmin(rabbitconnectionfactory);
// 初始化rabbittemplate
rabbittemplate rabbittemplate = new rabbittemplate(rabbitconnectionfactory);
rabbittemplate.setmessageconverter(serializermessageconverter);
eventtemplate = new defaulteventtemplate(rabbittemplate,defaultcodecfactory, this);
}
/**
* 初始化rabbitmq连接
*/
private void initrabbitconnectionfactory() {
rabbitconnectionfactory = new cachingconnectionfactory();
rabbitconnectionfactory.sethost(config.getserverhost());
rabbitconnectionfactory.setchannelcachesize(config.geteventmsgprocessnum());
rabbitconnectionfactory.setport(config.getport());
rabbitconnectionfactory.setusername(config.getusername());
rabbitconnectionfactory.setpassword(config.getpassword());
if (!stringutils.isempty(config.getvirtualhost())) {
rabbitconnectionfactory.setvirtualhost(config.getvirtualhost());
}
}
/**
* 注销程序
*/
public synchronized void destroy() throws exception {
if (!isstarted.get()) {
return;
}
msglistenercontainer.stop();
eventtemplate = null;
rabbitadmin = null;
rabbitconnectionfactory.destroy();
}
@override
public void start() {
if (isstarted.get()) {
return;
}
set<string> mapping = msgadapterhandler.getallbinding();
for (string relation : mapping) {
string[] relaarr = relation.split("\\|");
declarebinding(relaarr[1], relaarr[0]);
}
initmsglisteneradapter();
isstarted.set(true);
}
/**
* 初始化消息监听器容器
*/
private void initmsglisteneradapter(){
messagelistener listener = new messagelisteneradapter(msgadapterhandler,serializermessageconverter);
msglistenercontainer = new simplemessagelistenercontainer();
msglistenercontainer.setconnectionfactory(rabbitconnectionfactory);
msglistenercontainer.setacknowledgemode(acknowledgemode.auto);
msglistenercontainer.setmessagelistener(listener);
msglistenercontainer.seterrorhandler(new messageerrorhandler());
msglistenercontainer.setprefetchcount(config.getprefetchsize()); // 设置每个消费者消息的预取值
msglistenercontainer.setconcurrentconsumers(config.geteventmsgprocessnum());
msglistenercontainer.settxsize(config.getprefetchsize());//设置有事务时处理的消息数
msglistenercontainer.setqueues(queues.values().toarray(new queue[queues.size()]));
msglistenercontainer.start();
}
@override
public eventtemplate geteopeventtemplate() {
return eventtemplate;
}
@override
public eventcontroller add(string queuename, string exchangename,eventprocesser eventprocesser) {
return add(queuename, exchangename, eventprocesser, defaultcodecfactory);
}
public eventcontroller add(string queuename, string exchangename,eventprocesser eventprocesser,codecfactory codecfactory) {
msgadapterhandler.add(queuename, exchangename, eventprocesser, defaultcodecfactory);
if(isstarted.get()){
initmsglisteneradapter();
}
return this;
}
@override
public eventcontroller add(map<string, string> bindings,
eventprocesser eventprocesser) {
return add(bindings, eventprocesser,defaultcodecfactory);
}
public eventcontroller add(map<string, string> bindings,
eventprocesser eventprocesser, codecfactory codecfactory) {
for(map.entry<string, string> item: bindings.entryset())
msgadapterhandler.add(item.getkey(),item.getvalue(), eventprocesser,codecfactory);
return this;
}
/**
* exchange和queue是否已经绑定
*/
protected boolean bebinded(string exchangename, string queuename) {
return binded.contains(exchangename+"|"+queuename);
}
/**
* 声明exchange和queue已经它们的绑定关系
*/
protected synchronized void declarebinding(string exchangename, string queuename) {
string bindrelation = exchangename+"|"+queuename;
if (binded.contains(bindrelation)) return;
boolean needbinding = false;
directexchange directexchange = exchanges.get(exchangename);
if(directexchange == null) {
directexchange = new directexchange(exchangename, true, false, null);
exchanges.put(exchangename, directexchange);
rabbitadmin.declareexchange(directexchange);//声明exchange
needbinding = true;
}
queue queue = queues.get(queuename);
if(queue == null) {
queue = new queue(queuename, true, false, false);
queues.put(queuename, queue);
rabbitadmin.declarequeue(queue); //声明queue
needbinding = true;
}
if(needbinding) {
binding binding = bindingbuilder.bind(queue).to(directexchange).with(queuename);//将queue绑定到exchange
rabbitadmin.declarebinding(binding);//声明绑定关系
binded.add(bindrelation);
}
}
}
搞定,现在可以将defaulteventtemplate里的注释去掉了,接下来最后完成单元测试,为了测试传递对象,建立一个po
@suppresswarnings("serial")
public class people implements serializable{
private int id;
private string name;
private boolean male;
private people spouse;
private list<people> friends;
public int getid() {
return id;
}
public void setid(int id) {
this.id = id;
}
public string getname() {
return name;
}
public void setname(string name) {
this.name = name;
}
public boolean ismale() {
return male;
}
public void setmale(boolean male) {
this.male = male;
}
public people getspouse() {
return spouse;
}
public void setspouse(people spouse) {
this.spouse = spouse;
}
public list<people> getfriends() {
return friends;
}
public void setfriends(list<people> friends) {
this.friends = friends;
}
@override
public string tostring() {
// todo auto-generated method stub
return "people[id="+id+",name="+name+",male="+male+"]";
}
}
建立单元测试
public class rabbitmqtest{
private string defaulthost = "127.0.0.1";
private string defaultexchange = "exchange_direct_test";
private string defaultqueue = "queue_test";
private defaulteventcontroller controller;
private eventtemplate eventtemplate;
@before
public void init() throws ioexception{
eventcontrolconfig config = new eventcontrolconfig(defaulthost);
controller = defaulteventcontroller.getinstance(config);
eventtemplate = controller.geteopeventtemplate();
controller.add(defaultqueue, defaultexchange, new apiprocesseventprocessor());
controller.start();
}
@test
public void sendstring() throws sendrefuseexception{
eventtemplate.send(defaultqueue, defaultexchange, "hello world");
}
@test
public void sendobject() throws sendrefuseexception{
eventtemplate.send(defaultqueue, defaultexchange, mockobj());
}
@test
public void sendtemp() throws sendrefuseexception, interruptedexception{
string tempexchange = "exchange_direct_test_temp";//以前未声明的exchange
string tempqueue = "queue_test_temp";//以前未声明的queue
eventtemplate.send(tempqueue, tempexchange, mockobj());
//发送成功后此时不会接受到消息,还需要绑定对应的消费程序
controller.add(tempqueue, tempexchange, new apiprocesseventprocessor());
}
@after
public void end() throws interruptedexception{
thread.sleep(2000);
}
private people mockobj(){
people jack = new people();
jack.setid(1);
jack.setname("jack");
jack.setmale(true);
list<people> friends = new arraylist<>();
friends.add(jack);
people hanmeimei = new people();
hanmeimei.setid(1);
hanmeimei.setname("韩梅梅");
hanmeimei.setmale(false);
hanmeimei.setfriends(friends);
people lilei = new people();
lilei.setid(2);
lilei.setname("李雷");
lilei.setmale(true);
lilei.setfriends(friends);
lilei.setspouse(hanmeimei);
hanmeimei.setspouse(lilei);
return hanmeimei;
}
class apiprocesseventprocessor implements eventprocesser{
@override
public void process(object e) {//消费程序这里只是打印信息
assert.assertnotnull(e);
system.out.println(e);
if(e instanceof people){
people people = (people)e;
system.out.println(people.getspouse());
system.out.println(people.getfriends());
}
}
}
}
总结
以上就是java如何使用rabbitmq实现消息收发的实例的详细内容。