您好,欢迎访问一九零五行业门户网

用PHP尝试RabbitMQ(amqp扩充)实现消息的发送和接收

用php尝试rabbitmq(amqp扩展)实现消息的发送和接收
消费者:接收消息
逻辑:
创建连接-->创建channel-->创建交换机-->创建队列-->绑定交换机/队列/路由键-->接收消息
'192.168.1.93', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/');$e_name = 'e_linvo'; //交换机名$q_name = 'q_linvo'; //队列名$k_route = 'key_1'; //路由key//创建连接和channel$conn = new amqpconnection($conn_args);if (!$conn->connect()) { die(cannot connect to the broker!\n);}$channel = new amqpchannel($conn);//创建交换机$ex = new amqpexchange($channel);$ex->setname($e_name);$ex->settype(amqp_ex_type_direct); //direct类型$ex->setflags(amqp_durable); //持久化echo exchange status:.$ex->declare().\n;//创建队列$q = new amqpqueue($channel);$q->setname($q_name);$q->setflags(amqp_durable); //持久化echo message total:.$q->declare().\n;//绑定交换机与队列,并指定路由键echo 'queue bind: '.$q->bind($e_name, $k_route).\n;//阻塞模式接收消息echo message:\n;while(true){ $q->consume('processmessage'); //$q->consume('processmessage', amqp_autoack); //自动ack应答}$conn->disconnect();/*** 消费回调函数* 处理消息*/function processmessage($envelope, $queue) { $msg = $envelope->getbody(); echo $msg.\n; //处理消息 $queue->ack($envelope->getdeliverytag()); //手动发送ack应答}
生产者:发送消息
逻辑:
创建连接-->创建channel-->创建交换机对象-->发送消息
'192.168.1.93', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/');$e_name = 'e_linvo'; //交换机名//$q_name = 'q_linvo'; //无需队列名$k_route = 'key_1'; //路由key//创建连接和channel$conn = new amqpconnection($conn_args);if (!$conn->connect()) { die(cannot connect to the broker!\n);}$channel = new amqpchannel($conn);//消息内容$message = test message! 测试消息!;//创建交换机对象$ex = new amqpexchange($channel);$ex->setname($e_name);//发送消息//$channel->starttransaction(); //开始事务for($i=0; $ipublish($message, $k_route).\n;}//$channel->committransaction(); //提交事务$conn->disconnect();
需要注意的地方是:
queue对象有两个方法可用于取消息:consume和get。
前者是阻塞的,无消息时会被挂起,适合循环中使用;
后者则是非阻塞的,取消息时有则取,无则返回false。
测试截图
运行消费者:
运行生产者,发消息:
消费者接收到消息:
?
http://nonfu.me/p/9722.html
其它类似信息

推荐信息