本篇文章主要介绍了java利用redis实现消息队列的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
本文介绍了java利用redis实现消息队列的示例代码,分享给大家,具体如下:
应用场景
为什么要用redis?
二进制存储、java序列化传输、io连接数高、连接频繁
一、序列化
这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象; 主要是用到了bytearrayoutputstream和bytearrayinputstream; 注意:每个需要序列化的对象都要实现serializable接口;
其代码如下:
package utils;
import java.io.*;
/**
* created by kinglf on 2016/10/17.
*/
public class objectutil {
/**
* 对象转byte[]
* @param obj
* @return
* @throws ioexception
*/
public static byte[] object2bytes(object obj) throws ioexception{
bytearrayoutputstream bo=new bytearrayoutputstream();
objectoutputstream oo=new objectoutputstream(bo);
oo.writeobject(obj);
byte[] bytes=bo.tobytearray();
bo.close();
oo.close();
return bytes;
}
/**
* byte[]转对象
* @param bytes
* @return
* @throws exception
*/
public static object bytes2object(byte[] bytes) throws exception{
bytearrayinputstream in=new bytearrayinputstream(bytes);
objectinputstream sin=new objectinputstream(in);
return sin.readobject();
}
}
二、消息类(实现serializable接口)
package model;
import java.io.serializable;
/**
* created by kinglf on 2016/10/17.
*/
public class message implements serializable {
private static final long serialversionuid = -389326121047047723l;
private int id;
private string content;
public message(int id, string content) {
this.id = id;
this.content = content;
}
public int getid() {
return id;
}
public void setid(int id) {
this.id = id;
}
public string getcontent() {
return content;
}
public void setcontent(string content) {
this.content = content;
}
}
三、redis的操作
利用redis做队列,我们采用的是redis中list的push和pop操作;
结合队列的特点:
只允许在一端插入新元素只能在队列的尾部fifo:先进先出原则 redis中lpush头入(rpop尾出)或rpush尾入(lpop头出)可以满足要求,而redis中list药push或 pop的对象仅需要转换成byte[]即可
java采用jedis进行redis的存储和redis的连接池设置
上代码:
package utils;
import redis.clients.jedis.jedis;
import redis.clients.jedis.jedispool;
import redis.clients.jedis.jedispoolconfig;
import java.util.list;
import java.util.map;
import java.util.set;
/**
* created by kinglf on 2016/10/17.
*/
public class jedisutil {
private static string jedis_ip;
private static int jedis_port;
private static string jedis_password;
private static jedispool jedispool;
static {
//configuration自行写的配置文件解析类,继承自properties
configuration conf=configuration.getinstance();
jedis_ip=conf.getstring("jedis.ip","127.0.0.1");
jedis_port=conf.getint("jedis.port",6379);
jedis_password=conf.getstring("jedis.password",null);
jedispoolconfig config=new jedispoolconfig();
config.setmaxactive(5000);
config.setmaxidle(256);
config.setmaxwait(5000l);
config.settestonborrow(true);
config.settestonreturn(true);
config.settestwhileidle(true);
config.setminevictableidletimemillis(60000l);
config.settimebetweenevictionrunsmillis(3000l);
config.setnumtestsperevictionrun(-1);
jedispool=new jedispool(config,jedis_ip,jedis_port,60000);
}
/**
* 获取数据
* @param key
* @return
*/
public static string get(string key){
string value=null;
jedis jedis=null;
try{
jedis=jedispool.getresource();
value=jedis.get(key);
}catch (exception e){
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
}finally {
close(jedis);
}
return value;
}
private static void close(jedis jedis) {
try{
jedispool.returnresource(jedis);
}catch (exception e){
if(jedis.isconnected()){
jedis.quit();
jedis.disconnect();
}
}
}
public static byte[] get(byte[] key){
byte[] value = null;
jedis jedis = null;
try {
jedis = jedispool.getresource();
value = jedis.get(key);
} catch (exception e) {
//释放redis对象
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
} finally {
//返还到连接池
close(jedis);
}
return value;
}
public static void set(byte[] key, byte[] value) {
jedis jedis = null;
try {
jedis = jedispool.getresource();
jedis.set(key, value);
} catch (exception e) {
//释放redis对象
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
} finally {
//返还到连接池
close(jedis);
}
}
public static void set(byte[] key, byte[] value, int time) {
jedis jedis = null;
try {
jedis = jedispool.getresource();
jedis.set(key, value);
jedis.expire(key, time);
} catch (exception e) {
//释放redis对象
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
} finally {
//返还到连接池
close(jedis);
}
}
public static void hset(byte[] key, byte[] field, byte[] value) {
jedis jedis = null;
try {
jedis = jedispool.getresource();
jedis.hset(key, field, value);
} catch (exception e) {
//释放redis对象
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
} finally {
//返还到连接池
close(jedis);
}
}
public static void hset(string key, string field, string value) {
jedis jedis = null;
try {
jedis = jedispool.getresource();
jedis.hset(key, field, value);
} catch (exception e) {
//释放redis对象
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
} finally {
//返还到连接池
close(jedis);
}
}
/**
* 获取数据
*
* @param key
* @return
*/
public static string hget(string key, string field) {
string value = null;
jedis jedis = null;
try {
jedis = jedispool.getresource();
value = jedis.hget(key, field);
} catch (exception e) {
//释放redis对象
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
} finally {
//返还到连接池
close(jedis);
}
return value;
}
/**
* 获取数据
*
* @param key
* @return
*/
public static byte[] hget(byte[] key, byte[] field) {
byte[] value = null;
jedis jedis = null;
try {
jedis = jedispool.getresource();
value = jedis.hget(key, field);
} catch (exception e) {
//释放redis对象
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
} finally {
//返还到连接池
close(jedis);
}
return value;
}
public static void hdel(byte[] key, byte[] field) {
jedis jedis = null;
try {
jedis = jedispool.getresource();
jedis.hdel(key, field);
} catch (exception e) {
//释放redis对象
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
} finally {
//返还到连接池
close(jedis);
}
}
/**
* 存储redis队列 顺序存储
* @param key reids键名
* @param value 键值
*/
public static void lpush(byte[] key, byte[] value) {
jedis jedis = null;
try {
jedis = jedispool.getresource();
jedis.lpush(key, value);
} catch (exception e) {
//释放redis对象
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
} finally {
//返还到连接池
close(jedis);
}
}
/**
* 存储redis队列 反向存储
* @param key reids键名
* @param value 键值
*/
public static void rpush(byte[] key, byte[] value) {
jedis jedis = null;
try {
jedis = jedispool.getresource();
jedis.rpush(key, value);
} catch (exception e) {
//释放redis对象
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
} finally {
//返还到连接池
close(jedis);
}
}
/**
* 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端
* @param key reids键名
* @param destination 键值
*/
public static void rpoplpush(byte[] key, byte[] destination) {
jedis jedis = null;
try {
jedis = jedispool.getresource();
jedis.rpoplpush(key, destination);
} catch (exception e) {
//释放redis对象
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
} finally {
//返还到连接池
close(jedis);
}
}
/**
* 获取队列数据
* @param key 键名
* @return
*/
public static list lpoplist(byte[] key) {
list list = null;
jedis jedis = null;
try {
jedis = jedispool.getresource();
list = jedis.lrange(key, 0, -1);
} catch (exception e) {
//释放redis对象
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
} finally {
//返还到连接池
close(jedis);
}
return list;
}
/**
* 获取队列数据
* @param key 键名
* @return
*/
public static byte[] rpop(byte[] key) {
byte[] bytes = null;
jedis jedis = null;
try {
jedis = jedispool.getresource();
bytes = jedis.rpop(key);
} catch (exception e) {
//释放redis对象
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
} finally {
//返还到连接池
close(jedis);
}
return bytes;
}
public static void hmset(object key, map hash) {
jedis jedis = null;
try {
jedis = jedispool.getresource();
jedis.hmset(key.tostring(), hash);
} catch (exception e) {
//释放redis对象
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
} finally {
//返还到连接池
close(jedis);
}
}
public static void hmset(object key, map hash, int time) {
jedis jedis = null;
try {
jedis = jedispool.getresource();
jedis.hmset(key.tostring(), hash);
jedis.expire(key.tostring(), time);
} catch (exception e) {
//释放redis对象
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
} finally {
//返还到连接池
close(jedis);
}
}
public static list hmget(object key, string... fields) {
list result = null;
jedis jedis = null;
try {
jedis = jedispool.getresource();
result = jedis.hmget(key.tostring(), fields);
} catch (exception e) {
//释放redis对象
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
} finally {
//返还到连接池
close(jedis);
}
return result;
}
public static set hkeys(string key) {
set result = null;
jedis jedis = null;
try {
jedis = jedispool.getresource();
result = jedis.hkeys(key);
} catch (exception e) {
//释放redis对象
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
} finally {
//返还到连接池
close(jedis);
}
return result;
}
public static list lrange(byte[] key, int from, int to) {
list result = null;
jedis jedis = null;
try {
jedis = jedispool.getresource();
result = jedis.lrange(key, from, to);
} catch (exception e) {
//释放redis对象
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
} finally {
//返还到连接池
close(jedis);
}
return result;
}
public static map hgetall(byte[] key) {
map result = null;
jedis jedis = null;
try {
jedis = jedispool.getresource();
result = jedis.hgetall(key);
} catch (exception e) {
//释放redis对象
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
} finally {
//返还到连接池
close(jedis);
}
return result;
}
public static void del(byte[] key) {
jedis jedis = null;
try {
jedis = jedispool.getresource();
jedis.del(key);
} catch (exception e) {
//释放redis对象
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
} finally {
//返还到连接池
close(jedis);
}
}
public static long llen(byte[] key) {
long len = 0;
jedis jedis = null;
try {
jedis = jedispool.getresource();
jedis.llen(key);
} catch (exception e) {
//释放redis对象
jedispool.returnbrokenresource(jedis);
e.printstacktrace();
} finally {
//返还到连接池
close(jedis);
}
return len;
}
}
四、configuration主要用于读取redis的配置信息
package utils;
import java.io.ioexception;
import java.io.inputstream;
import java.util.properties;
/**
* created by kinglf on 2016/10/17.
*/
public class configuration extends properties {
private static final long serialversionuid = -2296275030489943706l;
private static configuration instance = null;
public static synchronized configuration getinstance() {
if (instance == null) {
instance = new configuration();
}
return instance;
}
public string getproperty(string key, string defaultvalue) {
string val = getproperty(key);
return (val == null || val.isempty()) ? defaultvalue : val;
}
public string getstring(string name, string defaultvalue) {
return this.getproperty(name, defaultvalue);
}
public int getint(string name, int defaultvalue) {
string val = this.getproperty(name);
return (val == null || val.isempty()) ? defaultvalue : integer.parseint(val);
}
public long getlong(string name, long defaultvalue) {
string val = this.getproperty(name);
return (val == null || val.isempty()) ? defaultvalue : integer.parseint(val);
}
public float getfloat(string name, float defaultvalue) {
string val = this.getproperty(name);
return (val == null || val.isempty()) ? defaultvalue : float.parsefloat(val);
}
public double getdouble(string name, double defaultvalue) {
string val = this.getproperty(name);
return (val == null || val.isempty()) ? defaultvalue : double.parsedouble(val);
}
public byte getbyte(string name, byte defaultvalue) {
string val = this.getproperty(name);
return (val == null || val.isempty()) ? defaultvalue : byte.parsebyte(val);
}
public configuration() {
inputstream in = classloader.getsystemclassloader().getresourceasstream("config.xml");
try {
this.loadfromxml(in);
in.close();
} catch (ioexception ioe) {
}
}
}
五、测试
import model.message;
import utils.jedisutil;
import utils.objectutil;
import redis.clients.jedis.jedis;
import java.io.ioexception;
/**
* created by kinglf on 2016/10/17.
*/
public class testredisqueue {
public static byte[] rediskey = "key".getbytes();
static {
try {
init();
} catch (ioexception e) {
e.printstacktrace();
}
}
private static void init() throws ioexception {
for (int i = 0; i < 1000000; i++) {
message message = new message(i, "这是第" + i + "个内容");
jedisutil.lpush(rediskey, objectutil.object2bytes(message));
}
}
public static void main(string[] args) {
try {
pop();
} catch (exception e) {
e.printstacktrace();
}
}
private static void pop() throws exception {
byte[] bytes = jedisutil.rpop(rediskey);
message msg = (message) objectutil.bytes2object(bytes);
if (msg != null) {
system.out.println(msg.getid() + "----" + msg.getcontent());
}
}
}
每执行一次pop()方法,结果如下:
<br>1----这是第1个内容
<br>2----这是第2个内容
<br>3----这是第3个内容
<br>4----这是第4个内容
总结
至此,整个redis消息队列的生产者和消费者代码已经完成
1.message 需要传送的实体类(需实现serializable接口)
2.configuration redis的配置读取类,继承自properties
3.objectutil 将对象和byte数组双向转换的工具类
4.jedis 通过消息队列的先进先出(fifo)的特点结合redis的list中的push和pop操作进行封装的工具类
以上就是java如何使用redis来实现消息队列的具体分析的详细内容。
