下面由redis教程栏目给大家详解redis和队列,希望对需要的朋友有所帮助!
概要redis不仅可作为缓存服务器,还可用作消息队列。它的列表类型天生支持用作消息队列。如下图所示:
由于redis的列表是使用双向链表实现的,保存了头尾节点,所以在列表头尾两边插取元素都是非常快的。
普通队列实现所以可以直接使用redis的list实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop。简单示例如下:
存放消息端(消息生产者):
package org.yamikaze.redis.messsage.queue; import org.yamikaze.redis.test.myjedisfactory;import redis.clients.jedis.jedis; import java.util.concurrent.timeunit; /** * 消息生产者 * @author yamikaze */public class producer extends thread { public static final string message_key = "message:queue"; private jedis jedis; private string producername; private volatile int count; public producer(string name) { this.producername = name; init(); } private void init() { jedis = myjedisfactory.getlocaljedis(); } public void putmessage(string message) { long size = jedis.lpush(message_key, message); system.out.println(producername + ": 当前未被处理消息条数为:" + size); count++; } public int getcount() { return count; } @override public void run() { try { while (true) { putmessage(stringutils.generate32str()); timeunit.seconds.sleep(1); } } catch (interruptedexception e) { } catch (exception e) { e.printstacktrace(); } } public static void main(string[] args) throws interruptedexception{ producer producer = new producer("myproducer"); producer.start(); for(; ;) { system.out.println("main : 已存储消息条数:" + producer.getcount()); timeunit.seconds.sleep(10); } }}
消息处理端(消息消费者):
package org.yamikaze.redis.messsage.queue; import org.yamikaze.redis.test.myjedisfactory;import redis.clients.jedis.jedis; /** * 消息消费者 * @author yamikaze */public class customer extends thread{ private string customername; private volatile int count; private jedis jedis; public customer(string name) { this.customername = name; init(); } private void init() { jedis = myjedisfactory.getlocaljedis(); } public void processmessage() { string message = jedis.rpop(producer.message_key); if(message != null) { count++; handle(message); } } public void handle(string message) { system.out.println(customername + " 正在处理消息,消息内容是: " + message + " 这是第" + count + "条"); } @override public void run() { while (true) { processmessage(); } } public static void main(string[] args) { customer customer = new customer("yamikaze"); customer.start(); }}
貌似还不错,但上述例子中消息消费者有一个问题存在,即需要不停的调用rpop方法查看list中是否有待处理消息。每调用一次都会发起一次连接,这会造成不必要的浪费。也许你会使用thread.sleep()等方法让消费者线程隔一段时间再消费,但这样做有两个问题:
1)、如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。
2)、如果睡眠时间过长,这样不能处理一些时效性的消息,睡眠时间过短,也会在连接上造成比较大的开销。
所以可以使用brpop指令,这个指令只有在有元素时才返回,没有则会阻塞直到超时返回null,于是消费端可以将processmessage可以改为这样:
public void processmessage() { /** * brpop支持多个列表(队列) * brpop指令是支持队列优先级的,比如这个例子中message_key的优先级大于testkey(顺序决定)。 * 如果两个列表中都有元素,会优先返回优先级高的列表中的元素,所以这儿优先返回message_key * 0表示不限制等待,会一直阻塞在这儿 */ list<string> messages = jedis.brpop(0, producer.message_key, "testkey"); if(messages.size() != 0) { //由于该指令可以监听多个key,所以返回的是一个列表 //列表由2项组成,1) 列表名,2)数据 string keyname = messages.get(0); //如果返回的是message_key的消息 if(producer.message_key.equals(keyname)) { string message = messages.get(1); handle(message); } } system.out.println("=======================");}
然后可以运行customer,清空控制台,可以看到程序没有任何输出,阻塞在了brpop这儿。然后在打开redis的客户端,输入指令client list,可以查看当前有两个连接。
一次生产多次消费的队列redis除了对消息队列提供支持外,还提供了一组命令用于支持发布/订阅模式。利用redis的pub/sub模式可以实现一次生产多次消费的队列。
1)发布
publish指令可用于发布一条消息,格式 publish channel message
返回值表示订阅了该消息的数量。
2)订阅
subscribe指令用于接收一条消息,格式 subscribe channel
可以看到使用subscribe指令后进入了订阅模式,但没有接收到publish发送的消息,这是因为只有在消息发出去前订阅才会接收到。在这个模式下其他指令,只能看到回复。回复分为三种类型:
1、如果为subscribe,第二个值表示订阅的频道,第三个值表示是第几个订阅的频道?(理解成序号?)
2、如果为message(消息),第二个值为产生该消息的频道,第三个值为消息
3、如果为unsubscribe,第二个值表示取消订阅的频道,第三个值表示当前客户端的订阅数量。
可以使用指令unsubscribe退订,如果不加参数,则会退订所有由subscribe指令订阅的频道。
redis还支持基于通配符的消息订阅,使用指令psubscribe (pattern subscribe),例如:
再试试推送消息会得到以下结果:
可以看到publish指令返回的是2,而订阅端这边接收了两次消息。这是因为psubscribe指令可以重复订阅频道。而使用psubscribe指令订阅的频道也要使用指令punsubscribe指令退订,该指令无法退订subscribe订阅的频道,同理unsubscribe也不能退订psubscribe指令订阅的频道。同时punsubscribe指令通配符不会展开。
例如:punsubscribe * 不会匹配到 channel.*, 所以要取消订阅channel.*就要这样写pubsubscribe channel.*。
代码示范如下:
package org.yamikaze.redis.messsage.subscribe; import org.yamikaze.redis.messsage.queue.stringutils;import org.yamikaze.redis.test.myjedisfactory;import redis.clients.jedis.jedis; /** * 消息发布方 * @author yamikaze */public class publisher { public static final string channel_key = "channel:message"; private jedis jedis; public publisher() { jedis = myjedisfactory.getlocaljedis(); } public void publishmessage(string message) { if(stringutils.isblank(message)) { return; } jedis.publish(channel_key, message); } public static void main(string[] args) { publisher publisher = new publisher(); publisher.publishmessage("hello redis!"); }}
简单的发送一个消息。
消息订阅方:
package org.yamikaze.redis.messsage.subscribe; import org.yamikaze.redis.test.myjedisfactory;import redis.clients.jedis.jedis;import redis.clients.jedis.jedispubsub; import java.util.concurrent.timeunit; /** * 消息订阅方客户端 * @author yamikaze */public class subscribeclient { private jedis jedis; private static final string exit_command = "exit"; public subscribeclient() { jedis = myjedisfactory.getlocaljedis(); } public void subscribe(string ...channel) { if(channel == null || channel.length <= 0) { return; } //消息处理,接收到消息时如何处理 jedispubsub jps = new jedispubsub() { /** * jedispubsub类是一个没有抽象方法的抽象类,里面方法都是一些空实现 * 所以可以选择需要的方法覆盖,这儿使用的是subscribe指令,所以覆盖了onmessage * 如果使用psubscribe指令,则覆盖onpmessage方法 * 当然也可以选择binaryjedispubsub,同样是抽象类,但方法参数为byte[] */ @override public void onmessage(string channel, string message) { if(publisher.channel_key.equals(channel)) { system.out.println("接收到消息: channel : " + message); //接收到exit消息后退出 if(exit_command.equals(message)) { system.exit(0); } } } /** * 订阅时 */ @override public void onsubscribe(string channel, int subscribedchannels) { if(publisher.channel_key.equals(channel)) { system.out.println("订阅了频道:" + channel); } } }; //可以订阅多个频道 当前线程会阻塞在这儿 jedis.subscribe(jps, channel); } public static void main(string[] args) { subscribeclient client = new subscribeclient(); client.subscribe(publisher.channel_key); //并没有 unsubscribe方法 //相应的也没有punsubscribe方法 }}
先运行client,再运行publisher进行消息发送,输出结果:
redis的pub/sub也有其缺点,那就是如果消费者下线,生产者的消息会丢失。
延时队列背景在业务发展过程中,会出现一些需要延时处理的场景,比如:
a.订单下单之后超过30分钟用户未支付,需要取消订单
b.订单一些评论,如果48h用户未对商家评论,系统会自动产生一条默认评论
c.点我达订单下单后,超过一定时间订单未派出,需要超时取消订单等。。。
处理这类需求,比较直接简单的方式就是定时任务轮训扫表。这种处理方式在数据量不大的场景下是完全没问题,但是当数据量大的时候高频的轮训数据库就会比较的耗资源,导致数据库的慢查或者查询超时。所以在处理这类需求时候,采用了延时队列来完成。
几种延时队列延时队列就是一种带有延迟功能的消息队列。下面会介绍几种目前已有的延时队列:
1.java中java.util.concurrent.delayqueue优点:jdk自身实现,使用方便,量小适用
缺点:队列消息处于jvm内存,不支持分布式运行和消息持久化
2.rocketmq延时队列优点:消息持久化,分布式
缺点:不支持任意时间精度,只支持特定level的延时消息
3.rabbitmq延时队列(ttl+dlx实现)优点:消息持久化,分布式
缺点:延时相同的消息必须扔在同一个队列
redis实现的延时消息队列适合的项目特点:spring框架管理对象有消息需求,但不想维护mq中间件有使用redis对消息持久化并没有很苛刻的要求redis实现的延时消息队列思路redis由于其自身的zset数据结构,本质就是set结构上加了个排序的功能,除了添加数据value之外,还提供另一属性score,这一属性在添加修改元素时候可以指定,每次指定后,zset会自动重新按新的值调整顺序。可以理解为有两列字段的数据表,一列存value,一列存顺序编号。操作中key理解为zset的名字,那么对延时队列又有何用呢?
试想如果score代表的是想要执行时间的时间戳,在某个时间将它插入zset集合中,它变会按照时间戳大小进行排序,也就是对执行时间前后进行排序,这样的话,起一个死循环线程不断地进行取第一个key值,如果当前时间戳大于等于该key值的socre就将它取出来进行消费删除,就可以达到延时执行的目的, 注意不需要遍历整个zset集合,以免造成性能浪费。
zset的排列效果如下图:
java代码实现如下:
package cn.chinotan.service.delayqueueredis;import org.apache.commons.lang3.stringutils;import redis.clients.jedis.jedis;import redis.clients.jedis.jedispool;import redis.clients.jedis.tuple;import java.text.simpledateformat;import java.util.calendar;import java.util.date;import java.util.set;import java.util.concurrent.countdownlatch;import java.util.concurrent.timeunit;/** * @program: test * @description: redis实现延时队列 * @author: xingcheng * @create: 2018-08-19 **/public class apptest { private static final string addr = "127.0.0.1"; private static final int port = 6379; private static jedispool jedispool = new jedispool(addr, port); private static countdownlatch cdl = new countdownlatch(10); public static jedis getjedis() { return jedispool.getresource(); } /** * 生产者,生成5个订单 */ public void productiondelaymessage() { for (int i = 0; i < 5; i++) { calendar instance = calendar.getinstance(); // 3秒后执行 instance.add(calendar.second, 3 + i); apptest.getjedis().zadd("orderid", (instance.gettimeinmillis()) / 1000, stringutils.join("000000000", i + 1)); system.out.println("生产订单: " + stringutils.join("000000000", i + 1) + " 当前时间:" + new simpledateformat("yyyy-mm-dd hh:mm:ss").format(new date())); system.out.println((3 + i) + "秒后执行"); } } //消费者,取订单 public static void consumerdelaymessage() { jedis jedis = apptest.getjedis(); while (true) { set<tuple> order = jedis.zrangewithscores("orderid", 0, 0); if (order == null || order.isempty()) { system.out.println("当前没有等待的任务"); try { timeunit.microseconds.sleep(500); } catch (interruptedexception e) { e.printstacktrace(); } continue; } tuple tuple = (tuple) order.toarray()[0]; double score = tuple.getscore(); calendar instance = calendar.getinstance(); long nowtime = instance.gettimeinmillis() / 1000; if (nowtime >= score) { string element = tuple.getelement(); long orderid = jedis.zrem("orderid", element); if (orderid > 0) { system.out.println(new simpledateformat("yyyy-mm-dd hh:mm:ss").format(new date()) + ":redis消费了一个任务:消费的订单orderid为" + element); } } } } static class delaymessage implements runnable{ @override public void run() { try { cdl.await(); consumerdelaymessage(); } catch (interruptedexception e) { e.printstacktrace(); } } } public static void main(string[] args) { apptest apptest = new apptest(); apptest.productiondelaymessage(); for (int i = 0; i < 10; i++) { new thread(new delaymessage()).start(); cdl.countdown(); } }}
实现效果如下:
以上就是详解redis和队列的详细内容。