700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > 详解PHP实现生产者与消费者(Kafka应用)

详解PHP实现生产者与消费者(Kafka应用)

时间:2019-01-14 01:26:23

相关推荐

详解PHP实现生产者与消费者(Kafka应用)

后端开发|php教程

laravel,PHP

后端开发-php教程

js 防盗源码,vscode的vim跳转,ubuntu下compiz,tomcat总是自己关,ei爬虫,php png文字,贾汪区专业seo优化报价,录播教学网站源码,学校织梦cms网站模板下载lzw

网贷信用查询系统源码,ubuntu检测磁盘坏道,怎样监控爬虫状态,php时间戳(),武清seo咨询lzw

本篇文章给大家介绍PHP实现生产者与消费者,希望对需要的朋友有所帮助!

arpg源码,如何确定ubuntu版本,64位tomcat9,搜索爬虫软件,php中上传文件函数,布局SEOlzw

前言

PHP中使用Kafka需要RdKafka扩展,而RdKafka依赖于librdkafka,所以这两个我们都需要安装,具体安装方法自行百度,本篇不做说明了。

生产者(测试)

创建消费者需要步骤:

生产者配置参数创建生产者实例创建主题实例(依赖生产者)生产主题消息推送消息

具体代码如下:

$conf = new \RdKafka\Conf(); // 绑定服务节点 $conf->set(metadata.broker.list, 127.0.0.1:32772); // 创建生产者 $kafka = new \RdKafka\Producer($conf); // 创建主题实例 $topic = $kafka->newTopic(p1r1); // 生产主题数据,此时消息在缓冲区中,并没有真正被推送 $topic->produce(RD_KAFKA_PARTITION_UA, 0, Message); // 阻塞时间(毫秒), 0为非阻塞 $kafka->poll(0); // 推送消息,如果不调用此函数,消息不会被发送且会丢失 $result = $kafka->flush(5000); if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { throw new \RuntimeException(Was unable to flush, messages might be lost!); }

消费者

创建一个消费者需要几个步骤:

消费者配置参数应用配置参数创建消费者实例订阅对应主题拉取数据提交位移

具体代码如下:

$conf = new \RdKafka\Conf(); // 绑定消费者组 $conf->set(group.id, ceshi); // 绑定服务节点,多个用,分隔 $conf->set(metadata.broker.list, 127.0.0.1:32787); // 设置自动提交为false $conf->set(mit, false); // 设置当前消费者拉取数据时的偏移量, 可选参数: // earliest: 如果消费者组是新创建的,从头开始消费,否则从消费者组当前消费位移开始。 // latest:如果消费者组是新创建的,从最新偏移量开始,否则从消费者组当前消费位移开始。 $conf->set(auto.offset.reset, earliest); // 创建消费者实例 $consumer = new \RdKafka\KafkaConsumer($conf); // 消费者订阅主题,数组形式 $consumer->subscribe([ opic1, opic2]); while (true) { // 消费数据,阻塞5秒(5秒内有数据就消费,没有数据等待5秒进入下一轮循环) $message = $consumer->consume(5000); switch ($message->err) {case RD_KAFKA_RESP_ERR_NO_ERROR: // 业务逻辑 var_dump($message); // 提交位移 $consumer->commit($message); break;case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break;case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break;default: throw new \Exception($message->errstr(), $message->err); break; } } // 关闭消费者(一般用在脚本中,不需要关闭) $conumser->close();

只消费指定分区中的数据:

// 对消费者指定分区,注意此方式不能与subscribe一同使用 $consumer->assign([ new RdKafka\TopicPartition("topic", 0), new RdKafka\TopicPartition("topic", 1), ]);

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。