您好,欢迎访问一九零五行业门户网

RibbetMQ php扩展使用 实现队列生产消费

ribbetmqphp扩展使用实现队列生产消费 无 一般的队列系统,是指linux中的crontab定时启动脚本来处理任务:首先下载一个rabbitmq的客户端,他相当于一个容器,装排队数据的容器http://www.rabbitmq.com/download.html默认的端口是55672 访问地址http://127.0.0.
ribbetmq php扩展使用 实现队列生产消费 一般的队列系统,是指linux中的crontab定时启动脚本来处理任务:首先下载一个rabbitmq的客户端,他相当于一个容器,装排队数据的容器http://www.rabbitmq.com/download.html默认的端口是55672 访问地址http://127.0.0.1:55672/默认帐号密码 guest guest你可以看到rabbitmq 的管理界面mq的任务是一个不浪费资源,的一个队列系统! php使用需要下载一个amqp扩展 或者直接点击下面的地址找到适合自己的版本,下载 http://pecl.php.net/package/amqp/1.2.0/windows rabbitmq.1.dll 放在c盘windows下 php_amqp.dll 放入php扩展中 开启php_amqp.dll的引用 重启服务器用phpinfo();查看是否引用成功,如果出现以下的amqp扩展,那就说明成功了首先是rabbitmq的生产者: 创建第一个index文件:然后去mq中查看,如果添加一个test001的队列名信息,就说明已经添加进去了,xx22的信息已经在mq中存储! 接下来就需要跑数据了。 createqueue(array('xxx','2222'),'test001'); echo ok; function createqueue($message,$queuename,$exchangename = '', $queuekey = '') { $queuename = self::getqueuename($queuename); $conn_args = array('host' =>'localhost', 'port'=> '5672', 'login' =>'guest', //mq帐号 'password'=> '', //mq密码 'vhost' => '/'); $conn = new amqpconnection($conn_args); $conn->connect(); $channel = new amqpchannel($conn); if (!$exchangename) { $exchangename = $queuename; } $queuename = $queuename; if (!$queuekey) { $queuekey = $queuename; } $ex = new amqpexchange($channel); $ex->setname($exchangename); $ex->settype(amqp_ex_type_topic); $ex->setflags(amqp_durable); //exchange持久化 $ex->declareexchange(); $q = new amqpqueue($channel); $q->setname($queuename); $q->setflags(amqp_durable); //queue持久化 $q->declarequeue(); $q->bind($exchangename, $queuekey); $channel->starttransaction(); /** * 消息持久化,delivery_mode:2持久化、delivery_mode:1非持久化,其中priority是设置消息的优先级,测试中发现并未起作用。 * 消息还有其他属性,请参考http://www.php.net/manual/zh/amqpexchange.publish.php */ $result = $ex->publish(json_encode($message), $queuekey, amqp_noparam, array('delivery_mode'=>2, 'priority'=> 9)); $channel->committransaction(); $conn->disconnect(); } 有了生产者,那就有消费者。脚本如果没有其他的修改或问题,基本上都是常年启动的:消费者基类: class workercommand{ function qinit($q_name,$e_name='',$k_route=''){ $q_name = utils::getqueuename($q_name); $conn_args = array( 'host' => '127.0.0.1', //mq的配置 'port' => '5672', 'login' => 'guest', 'password' => 'huoxingxing', 'vhost' => '/' ); //创建连接和channel $conn = new amqpconnection($conn_args); if (!$conn->connect()) { die(cannot connect to the broker!\n); } $channel = new amqpchannel($conn); //创建交换机 $ex = new amqpexchange($channel); if (!$e_name) { $e_name = $q_name; } $ex->setname($e_name); $ex->settype(amqp_ex_type_direct); //direct类型 $ex->setflags(amqp_durable); //持久化 // echo exchange status: . $ex->declareexchange() . \n; //创建队列 $q = new amqpqueue($channel); $q->setname($q_name); $q->setflags(amqp_durable); //持久化 // echo message total: . $q->declareexchange() . \n; if (!$k_route) { $k_route = $q_name; } //绑定交换机与队列,并指定路由键 // echo 'queue bind: ' . $q->declarequeue($e_name, $k_route) . \n; //阻塞模式接收消息 echo message:\n; while (true) { $q->consume(array($this,'processmessage')); //$q->consume('processmessage', amqp_autoack); //自动ack应答 } $conn->disconnect(); }} 消费者:class workerwaresyncbackupcommand extends workercommand { function actionindex() { $this->qinit('syncwarebackup'); } function processmessage($envelope, $queue) { $msg = json_decode($envelope->getbody()); utils::dobackup('back',$msg,''); $queue->ack($envelope->getdeliverytag()); //手动发送ack应答 }}
其它类似信息

推荐信息