您好,欢迎访问一九零五行业门户网

教你如何快速进行php+kafka的安装

我们学习了解了这么多关于php的知识,今天教你们如何快速进行php+kafka的安装,如果不会的“童鞋”,那就跟随本篇文章一起继续学习吧
1、 安装java,并设置相关的环境变量
> wget https://download.java.net/openjdk/jdk7u75/ri/openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz> tar zxvf openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz> mv java-se-7u75-ri/ /opt/> export java_home=/opt/java-se-7u75-ri> export path=$path:$java_home/bin:$java_home/jre/bin> export classpath=.:$java_home/lib:$java_home/lib/tools.jar#验证安装> java -verisonopenjdk version "1.7.0_75"openjdk runtime environment (build 1.7.0_75-b13)openjdk 64-bit server vm (build 24.75-b04, mixed mode)
2、安装kafka,这里以0.10.2版本为例
> wget http://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz> tar zxvf kafka_2.11-0.10.2.0.tgz> mv kafka_2.11-0.10.2.0/ /opt/kafka> cd /opt/kafka#启动zookeeper> bin/zookeeper-server-start.sh config/zookeeper.properties[2013-04-22 15:01:37,495] info reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.quorumpeerconfig)...#启动kafka> bin/kafka-server-start.sh config/server.properties[2013-04-22 15:01:47,028] info verifying properties (kafka.utils.verifiableproperties)[2013-04-22 15:01:47,051] info property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.verifiableproperties)...#尝试创建一个topic> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test> bin/kafka-topics.sh --list --zookeeper localhost:2181test#生产者写入消息> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testthis is a messagethis is another message#消费者消费消息> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginningthis is a messagethis is another message
3、安装kafka的c操作库
> wget https://github.com/edenhill/librdkafka/archive/v1.3.0.tar.gz> tar zxvf v1.3.0.tar.gz> cd librdkafka-1.3.0> ./configure> make && make install
4、安装php的kafka扩展 ,这里选择php-rdkafka扩展 https://github.com/arnaud-lb/php-rdkafka
> wget https://github.com/arnaud-lb/php-rdkafka/archive/4.0.2.tar.gz> tar 4.0.2.tar.gz> cd php-rdkafka-4.0.2> /opt/php7/bin/phpize> ./configure --with-php-config=/opt/php7/bin/php-config> make && make install
修改php.ini,加入 extension=rdkafka.so
5、安装rdkafka的ide代码提示文件
> composer create-project kwn/php-rdkafka-stubs php-rdkafka-stubs
以phpstrom为例,在你的项目的external libraries右键,选择configure php include paths,把刚刚的路径添加进来。
6、编写php测试代码
producer:
<?php$conf = new rdkafka\conf();$conf->set('log_level', log_err);$conf->set('debug', 'admin');$conf->set('metadata.broker.list', 'localhost:9092');//if you need to produce exactly once and want to keep the original produce order, uncomment the line below//$conf->set('enable.idempotence', 'true');$producer = new rdkafka\producer($conf);$topic = $producer->newtopic("test2");for ($i = 0; $i < 10; $i++) { $topic->produce(rd_kafka_partition_ua, 0, "message $i"); $producer->poll(0);}for ($flushretries = 0; $flushretries < 10; $flushretries++) { $result = $producer->flush(10000); if (rd_kafka_resp_err_no_error === $result) { break; }}if (rd_kafka_resp_err_no_error !== $result) { throw new \runtimeexception('was unable to flush, messages might be lost!');}
low-level consumer:
<?php$conf = new rdkafka\conf();$conf->set('log_level', log_err);$conf->set('debug', 'admin');// set the group id. this is required when storing offsets on the broker$conf->set('group.id', 'myconsumergroup');$rk = new rdkafka\consumer($conf);$rk->addbrokers("127.0.0.1");$topicconf = new rdkafka\topicconf();$topicconf->set('auto.commit.interval.ms', 100);// set the offset store method to 'file'$topicconf->set('offset.store.method', 'broker');// alternatively, set the offset store method to 'none'// $topicconf->set('offset.store.method', 'none');// set where to start consuming messages when there is no initial offset in// offset store or the desired offset is out of range.// 'smallest': start from the beginning$topicconf->set('auto.offset.reset', 'smallest');$topic = $rk->newtopic("test2", $topicconf);// start consuming partition 0$topic->consumestart(0, rd_kafka_offset_stored);while (true) { $message = $topic->consume(0, 10000); switch ($message->err) { case rd_kafka_resp_err_no_error: print_r($message); break; case rd_kafka_resp_err__partition_eof: echo "no more messages; will wait for more\n"; break; case rd_kafka_resp_err__timed_out: echo "timed out\n"; break; default: throw new \exception($message->errstr(), $message->err); break; }}
high-level consumer:
<?php$conf = new rdkafka\conf();// set a rebalance callback to log partition assignments (optional)$conf->setrebalancecb(function (rdkafka\kafkaconsumer $kafka, $err, array $partitions = null) { switch ($err) { case rd_kafka_resp_err__assign_partitions: echo "assign: "; var_dump($partitions); $kafka->assign($partitions); break; case rd_kafka_resp_err__revoke_partitions: echo "revoke: "; var_dump($partitions); $kafka->assign(null); break; default: throw new \exception($err); }});// configure the group.id. all consumer with the same group.id will consume// different partitions.$conf->set('group.id', 'myconsumergroup');// initial list of kafka brokers$conf->set('metadata.broker.list', '127.0.0.1');// set where to start consuming messages when there is no initial offset in// offset store or the desired offset is out of range.// 'smallest': start from the beginning$conf->set('auto.offset.reset', 'smallest');$consumer = new rdkafka\kafkaconsumer($conf);// subscribe to topic 'test2'$consumer->subscribe(['test2']);echo "waiting for partition assignment... (make take some time when\n";echo "quickly re-joining the group after leaving it.)\n";while (true) { $message = $consumer->consume(1000); switch ($message->err) { case rd_kafka_resp_err_no_error: print_r($message); break; case rd_kafka_resp_err__partition_eof: echo "no more messages; will wait for more\n"; break; case rd_kafka_resp_err__timed_out: echo "timed out\n"; break; default: throw new \exception($message->errstr(), $message->err); break; } sleep(2);}
推荐学习:《php视频教程》
以上就是教你如何快速进行php+kafka的安装的详细内容。
其它类似信息

推荐信息