摘要:集群部署安装依赖可以参阅支持的客户端版本生产者连接集群,创建,生产数据。链接集群创建消费者自动分配,,。消费者指定消费。没有消费组的概念,也可以认为每个消费者都属于一个独立消费组。
Kafka集群部署
安装rdkafkardkafka 依赖 libkafka
yum install rdkafka rdkafka-devel pecl install rdkafka php --ri rdkafka
http://pecl.php.net/package/r... 可以参阅支持的kafka客户端版本
生产者连接集群,创建 topic,生产数据。
setLogLevel(LOG_DEBUG); // 链接kafka集群 $rk->addBrokers("192.168.20.6:9092,192.168.20.6:9093"); // 创建topic $topic = $rk->newTopic("topic_1"); while (true) { $message = "hello kafka " . date("Y-m-d H:i:s"); echo "hello kafka " . date("Y-m-d H:i:s") . PHP_EOL; try { $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message); sleep(2); } catch (Exception $e) { echo $e->getMessage() . PHP_EOL; } }消费者-HighLevel
自动分配partition,rebalance,comsumer group。
setRebalanceCb(function (RdKafkaKafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(null); break; default: throw new Exception($err); } }); // Configure the group.id. All consumer with the same group.id will consume // different partitions. $conf->set("group.id", "group_1"); // Initial list of Kafka brokers $conf->set("metadata.broker.list", "192.168.20.6:9092,192.168.20.6:9093"); $topicConf = new RdKafkaTopicConf(); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // "smallest": start from the beginning $topicConf->set("auto.offset.reset", "smallest"); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafkaKafkaConsumer($conf); // Subscribe to topic "topic_1" $consumer->subscribe(["topic_1"]); echo "Waiting for partition assignment... (make take some time when "; echo "quickly re-joining the group after leaving it.) "; while (true) { $message = $consumer->consume(3e3); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: sleep(2); case RD_KAFKA_RESP_ERR__TIMED_OUT: echo $message->errstr() . PHP_EOL; break; default: throw new Exception($message->errstr(), $message->err); break; } }消费者-LowLevel
指定partition消费。
php consumer_lowlevel.php [partitonNuo]
LowLevel 没有消费组的概念,也可以认为每个消费者都属于一个独立消费组。
set("group.id", "group_2"); $rk = new RdKafkaConsumer($conf); $rk->addBrokers("192.168.20.6:9092,192.168.20.6:9093"); $topicConf = new RdKafkaTopicConf(); $topicConf->set("auto.commit.interval.ms", 2000); // Set the offset store method to "file" // $topicConf->set("offset.store.method", "file"); // $topicConf->set("offset.store.path", sys_get_temp_dir()); // Alternatively, set the offset store method to "broker" $topicConf->set("offset.store.method", "broker"); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // "smallest": start from the beginning $topicConf->set("auto.offset.reset", "smallest"); $topic = $rk->newTopic($topic, $topicConf); // Start consuming partition 0 $topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic->consume($partition, 3 * 1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: case RD_KAFKA_RESP_ERR__TIMED_OUT: echo $message->errstr() . PHP_EOL; break; default: throw new Exception($message->errstr(), $message->err); break; } }
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/31031.html
摘要:消息以为类别记录将消息种子分类每一类的消息称之为一个主题。这意味着生产者不等待来自同步完成的确认继续发送下一条批消息。这意味着在已成功收到的数据并得到确认后发送下一条。三种机制,性能依次递减吞吐量降低,数据健壮性则依次递增。 kafka 简介 Kafka 是一种高吞吐量的分布式发布订阅消息系统 kafka角色必知 producer:生产者。 consumer:消费者。 topic: 消...
摘要:最近项目开发中需要使用消息队列。不过在环境中安装的过程中出现了以下报错开始以为是因为安装缺少了一些依赖。然后使用了源码编译的方式进行安装同样报错了。然后安装它再执行,执行。扩展包使用纯粹的编写的客户端,目前支持以上版本的。 最近项目开发中需要使用 Kafka 消息队列。经过检索,PHP下面有通用的两种方式来调用 Kafka 。 php-rdkafka 扩展 以 PHP 扩展的形式进行...
阅读 496·2023-04-26 01:39
阅读 4330·2021-11-16 11:45
阅读 2560·2021-09-27 13:37
阅读 837·2021-09-01 10:50
阅读 3507·2021-08-16 10:50
阅读 2181·2019-08-30 15:55
阅读 2936·2019-08-30 15:55
阅读 2220·2019-08-30 14:07