这篇文章主要介绍了关于对nodejs如何操作消息队列rabbitmq的分析,有着一定的参考价值,现在分享给大家,有需要的朋友可以参考一下
一. 什么是消息队列?消息(message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(message queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 mq 中而不用管谁来取,消息使用者只管从 mq 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
二. 常用的消息队列有哪些?rabbitmq、rocketmq、activemq、kafka、zeromq、metamq。
甚至现在部分nosql也可做消息队列,如redis。
三. 消息队列的使用场景?异步处理应用解耦流量削峰四. 使用案例上规模的公司都会有自己的日志分析系统,日志系统是怎么实现的呢?
图解:用户在访问应用的时候,我们要记录下用户的操作记录和系统的异常日志,常规的做法是将系统产生的日志保存到服务器磁盘,在服务器中开启定时任务,定时将磁盘的日志信息传入mq中(生产者),也定时将mq中的消息取出并存到相应的数据库,如elasticsearch或hive中。
五. 如何安装rabbitmq?上面的案例介绍了mq的一个使用场景,我这里是用rabbitmq举例,现实项目中可能用到的是kafka。
首先安装brew(mac为例)
/usr/bin/ruby -e "$(curl -fssl https://raw.githubusercontent.com/homebrew/install/master/install)"
安装rabbitmq
brew install rabbitmq
运行rabbitmq
进入到 /usr/local/cellar/rabbitmq/3.7.7,执行
sbin/rabbitmq-server
启动插件
进入到 /usr/local/cellar/rabbitmq/3.7.7/sbin
./rabbitmq-plugins enable rabbitmq_management
登陆管理界面
打开浏览器输入:http://localhost:15672,rabbitmq默认15672端口六. nodejs操作rabbitmq
网上可以找到好几个相应的node sdk,这里推荐amqplib
1.生产者
/** * 对rabbitmq的封装 */let amqp = require('amqplib');class rabbitmq { constructor() { this.hosts = []; this.index = 0; this.length = this.hosts.length; this.open = amqp.connect(this.hosts[this.index]); } sendqueuemsg(queuename, msg, errcallback) { let self = this; self.open .then(function (conn) { return conn.createchannel(); }) .then(function (channel) { return channel.assertqueue(queuename).then(function (ok) { return channel.sendtoqueue(queuename, new buffer(msg), { persistent: true }); }) .then(function (data) { if (data) { errcallback && errcallback("success"); channel.close(); } }) .catch(function () { settimeout(() => { if (channel) { channel.close(); } }, 500) }); }) .catch(function () { let num = self.index++; if (num <= self.length - 1) { self.open = amqp.connect(self.hosts[num]); } else { self.index == 0; } }); }}
2. 消费者
/** * 对rabbitmq的封装 */let amqp = require('amqplib');class rabbitmq { constructor() { this.open = amqp.connect(this.hosts[this.index]); } receivequeuemsg(queuename, receivecallback, errcallback) { let self = this; self.open .then(function (conn) { return conn.createchannel(); }) .then(function (channel) { return channel.assertqueue(queuename) .then(function (ok) { return channel.consume(queuename, function (msg) { if (msg !== null) { let data = msg.content.tostring(); channel.ack(msg); receivecallback && receivecallback(data); } }) .finally(function () { settimeout(() => { if (channel) { channel.close(); } }, 500) }); }) }) .catch(function () { let num = self.index++; if (num <= self.length - 1) { self.open = amqp.connect(self.hosts[num]); } else { self.index = 0; self.open = amqp.connect(self.hosts[0]); } }); }
3. 通过生产者向mq发送一个消息,并创建队列
let mq = new rabbitmq();mq.sendqueuemsg('testqueue', 'my first message', (error) => { console.log(error)})
执行之后,我们打开管理平台,发现rabbbitmq已经接受到了一条消息:
并且rabbbitmq新增了一个队列testqueue
4. 获取指定队列的消息
let mq = new rabbitmq();mq.receivequeuemsg('testqueue',(msg) => { console.log(msg)})// 输出结果:my first message复制代码
此时打开rabbitmq管理平台,消息数量已经变为0
综上:我们简单讲述了消息队列及rabbitmq相关的一些知识,以及我们如何通过nodejs来生产与消费消息。
以上就是本文的全部内容,希望对大家的学习有所帮助,更多相关内容请关注!
相关推荐:
javascript如何实现文件的下载功能
通过node.js来调取baidu-aip-sdk实现身份证识别的功能
以上就是对于nodejs如何操作消息队列rabbitmq的分析的详细内容。