您好,欢迎访问一九零五行业门户网

activeMQ发布订阅模式中中常用工具类

package com.jms;import java.util.map;import java.util.concurrent.concurrenthashmap;import javax.jms.bytesmessage;import javax.jms.connection;import javax.jms.connectionfactory;import javax.jms.destination;import javax.jms.jmsexception;impo
package com.jms;import java.util.map;import java.util.concurrent.concurrenthashmap;import javax.jms.bytesmessage;import javax.jms.connection;import javax.jms.connectionfactory;import javax.jms.destination;import javax.jms.jmsexception;import javax.jms.messageconsumer;import javax.jms.messageproducer;import javax.jms.queue;import javax.jms.session;import javax.jms.textmessage;import org.apache.activemq.activemqconnection;import org.apache.activemq.activemqconnectionfactory;import org.clapper.util.logging.logger;import com.pzoom.dsa.common.util.log;import com.pzoom.dsa.nerd.mysql.dbqueryhelper;public class jms{ static connectionfactory connectionfactory; static connection connection = null; static session session; static map sendqueues = new concurrenthashmap(); static map getqueues = new concurrenthashmap(); static log log=log.getlogger(dbqueryhelper.class); static { connectionfactory = new activemqconnectionfactory( activemqconnection.default_user, activemqconnection.default_password, tcp://10.100.100.100:61616?wireformat.maxinactivityduration=0); try { connection = connectionfactory.createconnection(); connection.start(); session = connection.createsession(boolean.false.booleanvalue(), 1); } catch (exception e) { e.printstacktrace(); } } static messageproducer getmessageproducer(string name) { if (sendqueues.containskey(name)) return ((messageproducer)sendqueues.get(name)); try { destination destination = session.createqueue(name); messageproducer producer = session.createproducer(destination); sendqueues.put(name, producer); return producer; } catch (jmsexception e) { e.printstacktrace(); } return ((messageproducer)sendqueues.get(name)); } static messageconsumer getmessageconsumer(string name) { if (getqueues.containskey(name)) return ((messageconsumer)getqueues.get(name)); try { destination destination = session.createqueue(name); messageconsumer consumer = session.createconsumer(destination); getqueues.put(name, consumer); return consumer; } catch (jmsexception e) { e.printstacktrace(); } return ((messageconsumer)getqueues.get(name)); } public static void sendmessage(string queue, string text) { try { textmessage message = session.createtextmessage(text); getmessageproducer(queue).send(message); // log.info(sendmessage + queue + \t\t + text); } catch (jmsexception e) { e.printstacktrace(); } } public static string getmessage(string queue) { try { textmessage message = (textmessage)getmessageconsumer(queue).receive(10000l); if (message != null) return message.gettext(); } catch (jmsexception e) { e.printstacktrace(); } return null; } public static void close() { try { session.close(); } catch (jmsexception e) { e.printstacktrace(); } try { connection.close(); } catch (jmsexception e) { e.printstacktrace(); } }}
其它类似信息

推荐信息