这篇文章主要通过实例代码为大家详细介绍了如何在java 环境下使用 http 协议收发 mq 消息,需要的朋友可以参考下
1. 准备环境
在工程 pom 文件添加 http java 客户端的依赖。
<dependency>
<groupid>org.eclipse.jetty</groupid>
<artifactid>jetty-client</artifactid>
<version>9.3.4.rc1</version>
</dependency>
<dependency>
<groupid>com.aliyun.openservices</groupid>
<artifactid>ons-client</artifactid>
<version>1.1.11</version>
</dependency>
2. 运行代码配置(user.properties)
您需要设置配置文件(user.properties)的相关内容,具体请参考申请 mq 资源 。
#您在控制台创建的topic
topic=xxx
#公测url
url=http://publictest-rest.ons.aliyun.com
#阿里云身份验证码
ak=xxx
#阿里云身份验证密钥
sk=xxx
#mq控制台创建的producer id
producerid=xxx
#mq控制台创建的consumer id
consumerid=xxx
说明:url 中的 key,tag以及 post content-type 没有任何的限制,只要确保key 和 tag 相同唯一即可,可以放在 user.properties 里面。
3. http 发送消息示例代码
您可以按以下说明设置相应参数并测试 http 消息发送功能。
package com.aliyun.openservice.ons.http.demo;
import java.nio.charset.charset;
import java.util.date;
import java.util.properties;
import org.eclipse.jetty.client.httpclient;
import org.eclipse.jetty.client.api.contentprovider;
import org.eclipse.jetty.client.api.contentresponse;
import org.eclipse.jetty.client.api.request;
import org.eclipse.jetty.client.util.stringcontentprovider;
import com.aliyun.openservices.ons.api.impl.authority.authutil;
public class httpproducer {
public static string signature="signature";
public static string num="num";
public static string consumerid="consumerid";
public static string producerid="producerid";
public static string timeout="timeout";
public static string topic="topic";
public static string ak="accesskey";
public static string body="body";
public static string msghandle="msghandle";
public static string time="time";
public static void main(string[] args) throws exception {
httpclient httpclient=new httpclient();
httpclient.setmaxconnectionsperdestination(1);
httpclient.start();
properties properties=new properties();
properties.load(httpproducer.class.getclassloader().getresourceasstream("user.properties"));
string topic=properties.getproperty("topic"); //请在user.properties配置您的topic
string url=properties.getproperty("url");//公测集群配置为http://publictest-rest.ons.aliyun.com/
string ak=properties.getproperty("ak");//请在user.properties配置您的ak
string sk=properties.getproperty("sk");//请在user.properties配置您的sk
string pid=properties.getproperty("producerid");//请在user.properties配置您的producer id
string date=string.valueof(new date().gettime());
string sign=null;
string body="hello ons http";
string newline="\n";
string signstring;
for (int i = 0; i < 10; i++) {
date=string.valueof(new date().gettime());
request req=httpclient.post(url+"message/?topic="+topic+"&time="+date+"&tag=http"+"&key=http");
contentprovider content=new stringcontentprovider(body);
req.content(content);
signstring=topic+newline+pid+newline+md5.getinstance().getmd5string(body)+newline+date;
system.out.println(signstring);
sign=authutil.calsignature(signstring.getbytes(charset.forname("utf-8")), sk);
req.header(signature, sign);
req.header(ak, ak);
req.header(producerid, pid);
contentresponse response;
response=req.send();
system.out.println("send msg:"+response.getstatus()+response.getcontentasstring());
}
}
}
4. http接收消息示例代码
请按以下说明设置相应参数并测试 http 消息接收功能。
package com.aliyun.openservice.ons.http.demo;
import java.nio.charset.charset;
import java.util.date;
import java.util.list;
import java.util.properties;
import org.eclipse.jetty.client.httpclient;
import org.eclipse.jetty.client.api.contentprovider;
import org.eclipse.jetty.client.api.contentresponse;
import org.eclipse.jetty.client.api.request;
import org.eclipse.jetty.client.util.stringcontentprovider;
import org.eclipse.jetty.http.httpmethod;
import com.alibaba.fastjson.json;
import com.aliyun.openservice.ons.mqtt.demo.mqttproducer;
import com.aliyun.openservices.ons.api.impl.authority.authutil;
public class httpconsumer {
public static string signature="signature";
public static string num="num";
public static string consumerid="consumerid";
public static string producerid="producerid";
public static string timeout="timeout";
public static string topic="topic";
public static string ak="accesskey";
public static string body="body";
public static string msghandle="msghandle";
public static string time="time";
public static void main(string[] args) throws exception {
httpclient httpclient=new httpclient();
httpclient.setmaxconnectionsperdestination(1);
httpclient.start();
properties properties=new properties();
properties.load(httpconsumer.class.getclassloader().getresourceasstream("user.properties"));
string topic=properties.getproperty("topic"); //请在user.properties配置您的topic
string url=properties.getproperty("url");//公测集群配置为http://publictest-rest.ons.aliyun.com/
string ak=properties.getproperty("ak");//请在user.properties配置您的ak
string sk=properties.getproperty("sk");//请在user.properties配置您的sk
string cid=properties.getproperty("consumerid");//请在user.properties配置您的consumer id
string date=string.valueof(new date().gettime());
string sign=null;
string newline="\n";
string signstring;
system.out.println(newline+newline);
while (true) {
try {
date=string.valueof(new date().gettime());
request req=httpclient.post(url+"message/?topic="+topic+"&time="+date+"&num="+32);
req.method(httpmethod.get);
contentresponse response;
signstring=topic+newline+cid+newline+date;
sign=authutil.calsignature(signstring.getbytes(charset.forname("utf-8")), sk);
req.header(signature, sign);
req.header(ak, ak);
req.header(consumerid, cid);
long start=system.currenttimemillis();
response=req.send();
system.out.println("get cost:"+(system.currenttimemillis()-start)/1000
+" "+response.getstatus()+" "+response.getcontentasstring());
list<simplemessage> list = null;
if (response.getcontentasstring()!=null&&!response.getcontentasstring().isempty()) {
list=json.parsearray(response.getcontentasstring(), simplemessage.class);
}
if (list==null||list.size()==0) {
thread.sleep(100);
continue;
}
system.out.println("size is :"+list.size());
for (simplemessage simplemessage : list) {
date=string.valueof(new date().gettime());
system.out.println("receive msg:"+simplemessage.getbody()+" born time "+simplemessage.getborntime());
req=httpclient.post(url+"message/?msghandle="+simplemessage.getmsghandle()+"&topic="+topic+"&time="+date);
req.method(httpmethod.delete);
signstring=topic+newline+cid+newline+simplemessage.getmsghandle()+newline+date;
sign=authutil.calsignature(signstring.getbytes(charset.forname("utf-8")), sk);
req.header(signature, sign);
req.header(ak, ak);
req.header(consumerid, cid);
response=req.send();
system.out.println("delete msg:"+response.tostring());
}
thread.sleep(100);
} catch (exception e) {
e.printstacktrace();
}
}
}
}
5. http示例程序工具类
(1)消息封装类: simplemessage.java
package com.aliyun.openservice.ons.http.demo;
public class simplemessage {
private string body;
private string msgid;
private string borntime;
private string msghandle;
private int reconsumetimes;
private string tag;
public void settag(string tag) {
this.tag = tag;
}
public string gettag() {
return tag;
}
public int getreconsumetimes() {
return reconsumetimes;
}
public void setreconsumetimes(int reconsumetimes) {
this.reconsumetimes = reconsumetimes;
}
public void setmsghandle(string msghandle) {
this.msghandle = msghandle;
}
public string getmsghandle() {
return msghandle;
}
public string getbody() {
return body;
}
public void setbody(string body) {
this.body = body;
}
public string getmsgid() {
return msgid;
}
public void setmsgid(string msgid) {
this.msgid = msgid;
}
public string getborntime() {
return borntime;
}
public void setborntime(string borntime) {
this.borntime = borntime;
}
}
(2)字符串签名类: md5.java
package com.aliyun.openservice.ons.http.demo;
import java.io.unsupportedencodingexception;
import java.nio.charset.charset;
import java.security.messagedigest;
import java.sql.sqlexception;
import java.util.date;
import java.util.hashmap;
import java.util.map;
import java.util.concurrent.concurrenthashmap;
import java.util.concurrent.locks.reentrantlock;
import org.slf4j.loggerfactory;
public class md5 {
private static final org.slf4j.logger log = loggerfactory.getlogger(md5.class);
private static char[] digits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
private static map<character, integer> rdigits = new hashmap<character, integer>(16);
static {
for (int i = 0; i < digits.length; ++i) {
rdigits.put(digits[i], i);
}
}
private static md5 me = new md5();
private messagedigest mhasher;
private final reentrantlock oplock = new reentrantlock();
private md5() {
try {
this.mhasher = messagedigest.getinstance("md5");
} catch (exception e) {
throw new runtimeexception(e);
}
}
public static md5 getinstance() {
return me;
}
public string getmd5string(string content) {
return this.bytes2string(this.hash(content));
}
public string getmd5string(byte[] content) {
return this.bytes2string(this.hash(content));
}
public byte[] getmd5bytes(byte[] content) {
return this.hash(content);
}
public byte[] hash(string str) {
this.oplock.lock();
try {
byte[] bt = this.mhasher.digest(str.getbytes("utf-8"));
if (null == bt || bt.length != 16) {
throw new illegalargumentexception("md5 need");
}
return bt;
} catch (unsupportedencodingexception e) {
throw new runtimeexception("unsupported utf-8 encoding", e);
} finally {
this.oplock.unlock();
}
}
public byte[] hash(byte[] data) {
this.oplock.lock();
try {
byte[] bt = this.mhasher.digest(data);
if (null == bt || bt.length != 16) {
throw new illegalargumentexception("md5 need");
}
return bt;
} finally {
this.oplock.unlock();
}
}
public string bytes2string(byte[] bt) {
int l = bt.length;
char[] out = new char[l << 1];
for (int i = 0, j = 0; i < l; i++) {
out[j++] = digits[(0xf0 & bt[i]) >>> 4];
out[j++] = digits[0x0f & bt[i]];
}
if (log.isdebugenabled()) {
log.debug("[hash]" + new string(out));
}
return new string(out);
}
public byte[] string2bytes(string str) {
if (null == str) {
throw new nullpointerexception("argument is not allowed empty");
}
if (str.length() != 32) {
throw new illegalargumentexception("string length must equals 32");
}
byte[] data = new byte[16];
char[] chs = str.tochararray();
for (int i = 0; i < 16; ++i) {
int h = rdigits.get(chs[i * 2]).intvalue();
int l = rdigits.get(chs[i * 2 + 1]).intvalue();
data[i] = (byte) ((h & 0x0f) << 4 | l & 0x0f);
}
return data;
}
}
希望本篇文章对您有所帮助
以上就是实现http协议收发mq 消息的java代码实例的详细内容。