目录结构
引入 maven依赖
<parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version>1.5.4.release</version> <relativepath/> </parent> <properties> <project.build.sourceencoding>utf-8</project.build.sourceencoding> <project.reporting.outputencoding>utf-8</project.reporting.outputencoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-activemq</artifactid> </dependency> </dependencies> <build> <plugins> <plugin> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-maven-plugin</artifactid> </plugin> </plugins> </build>
引入 application.yml配置
spring: activemq: broker-url: tcp://127.0.0.1:61616 user: admin password: adminqueue: springboot-queueserver: port: 8080
创建queueconfig
@configurationpublic class queueconfig { @value(${queue}) private string queue; @bean public queue logqueue() { return new activemqqueue(queue); } @bean public jmstemplate jmstemplate(activemqconnectionfactory activemqconnectionfactory, queue queue) { jmstemplate jmstemplate = new jmstemplate(); jmstemplate.setdeliverymode(2);// 进行持久化配置 1表示非持久化,2表示持久化</span> jmstemplate.setconnectionfactory(activemqconnectionfactory); jmstemplate.setdefaultdestination(queue); // 此处可不设置默认,在发送消息时也可设置队列 jmstemplate.setsessionacknowledgemode(4);// 客户端签收模式</span> return jmstemplate; } // 定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂 @bean(name = jmsqueuelistener) public defaultjmslistenercontainerfactory jmsqueuelistenercontainerfactory( activemqconnectionfactory activemqconnectionfactory) { defaultjmslistenercontainerfactory factory = new defaultjmslistenercontainerfactory(); factory.setconnectionfactory(activemqconnectionfactory); // 设置连接数 factory.setconcurrency(1-10); // 重连间隔时间 factory.setrecoveryinterval(1000l); factory.setsessionacknowledgemode(4); return factory; }}
创建生产者:
@springbootapplication@component@enableschedulingpublic class producer { @autowired private jmsmessagingtemplate jmsmessagingtemplate; @autowired private queue queue; @scheduled(fixeddelay=3000) public void send() { string result = system.currenttimemillis()+---测试; system.out.println(result+result); jmsmessagingtemplate.convertandsend(queue,result); } public static void main(string[] args) { springapplication.run(producer.class, args); }}
创建消费者的application.yml
spring: activemq: broker-url: tcp://127.0.0.1:61616 user: admin password: adminqueue: springboot-queueserver: port: 8081
创建消费者:
@component@springbootapplicationpublic class consumer { private int count =0; @jmslistener(destination = ${queue}) public void receive(textmessage textmessage,session session) throws jmsexception { string text = textmessage.gettext(); system.out.println(消费:+text+第几次获取消息count:+(++count)); system.out.println(); string jmsmessageid = textmessage.getjmsmessageid(); } public static void main(string[] args) { springapplication.run(consumer.class,args); }}
结果显示:
以上就是怎么在springboot中整合activemq的详细内容。
