您好,欢迎访问一九零五行业门户网

php amqp 消息队列 RabbitMQ 交换器类型 直连 (三)_PHP教程

1、amqp_ex_type_direct:直连型直连型又包括: 1对1 和1对n(n对1、 n对n)
接收端receive.php代码如下connect();$channel = new amqpchannel($connect);$exchange = new amqpexchange($channel);$exchange->setname('exchange');$exchange->settype(amqp_ex_type_direct);$exchange->declare();$queue = new amqpqueue($channel);$queue->setname('logs');$queue->declare();$queue->bind('exchange', 'logs');while (true) { $queue->consume('callback');}$connection->close();function callback($envelope, $queue) { var_dump($envelope->getbody()); $queue->nack($envelope->getdeliverytag());}
发送端send.php代码如下
connect();$channel = new amqpchannel($connect);$exchange = new amqpexchange($channel);$exchange->setname('exchange');$exchange->settype(amqp_ex_type_direct);$exchange->declare();$exchange->publish('direct type test','logs');var_dump(send message ok);$connect->disconnect();
运行结果如图所示
创建receive_one.php和receive_two.php 并把send.php代码改成如下代码方便我们观看receive_one.php 和 receive_two.php 代码相同 或者用dos运行多个接收端
connect();$channel = new amqpchannel($connect);$exchange = new amqpexchange($channel);$exchange->setname('exchange');$exchange->settype(amqp_ex_type_direct);$exchange->declare();$queue = new amqpqueue($channel);$queue->setname('logs');@$queue->declare();$queue->bind('exchange', 'logs');while (true) { $queue->consume('callback');}$connection->close();function callback($envelope, $queue) { var_dump($envelope->getbody()); $queue->nack($envelope->getdeliverytag());}
send.php
connect();$channel = new amqpchannel($connect);$exchange = new amqpexchange($channel);$exchange->setname('exchange');$exchange->settype(amqp_ex_type_direct);$exchange->declare();for ($index = 1; $index publish($index,'logs'); var_dump(send:$index);}$exchange->delete();$connect->disconnect();
运行结果如下
列队会把消息分配给每一个接收端分配处理这里看似完美但是如果想要更好的处理不同的任务就需要 公平调度
比如当1、3处理的都是简单的人 2、4都是处理的复杂的任务 如果任务过多时 receive_one.php是空闲的而receive_two.php是任务繁重的我们进行如下测试send.php改成5改成50for ($index = 1; $index publish($index,'logs'); var_dump(send:$index);}
receive_two.php 加上 sleep(3)
function callback($envelope, $queue) { var_dump($envelope->getbody()); sleep(3); $queue->nack($envelope->getdeliverytag());}
我们运行程序结果如下
receive_one全部运行完而receive_two才运行一个 之后receive_one一直空闲我们可以通过 在接收端设置$channel->setprefetchcount(1);
任务没人完成前不接收新的消息把消息发送给其他接收端
如下receive_one.php 和 receive_two.php$channel = new amqpchannel($connect);
改成如下
$channel = new amqpchannel($connect);$channel->setprefetchcount(1);
http://www.bkjia.com/phpjc/735888.htmlwww.bkjia.comtruehttp://www.bkjia.com/phpjc/735888.htmltecharticle1、amqp_ex_type_direct:直连型 直连型又包括: 1对1 和1对n(n对1、 n对n) 接收端receive.php代码如下 connect();$channel = new amqpchannel($connect);$exchange...
其它类似信息

推荐信息