资讯专栏INFORMATION COLUMN

Kafka - PHP 使用 Rdkafka 生产/消费数据

BetaRabbit / 3565人阅读

摘要:集群部署安装依赖可以参阅支持的客户端版本生产者连接集群,创建,生产数据。链接集群创建消费者自动分配,,。消费者指定消费。没有消费组的概念,也可以认为每个消费者都属于一个独立消费组。

Kafka集群部署

安装rdkafka

rdkafka 依赖 libkafka

yum install rdkafka rdkafka-devel
pecl install rdkafka
php --ri rdkafka

http://pecl.php.net/package/r... 可以参阅支持的kafka客户端版本

生产者

连接集群,创建 topic,生产数据。

setLogLevel(LOG_DEBUG);

// 链接kafka集群
$rk->addBrokers("192.168.20.6:9092,192.168.20.6:9093");

// 创建topic
$topic = $rk->newTopic("topic_1");

while (true) {
    $message = "hello kafka " . date("Y-m-d H:i:s");
    echo "hello kafka " . date("Y-m-d H:i:s") . PHP_EOL;

    try {
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message);
        sleep(2);
    } catch (Exception $e) {
        echo $e->getMessage() . PHP_EOL;
    }
}
消费者-HighLevel

自动分配partitionrebalancecomsumer group

setRebalanceCb(function (RdKafkaKafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign($partitions);
            break;
        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
            echo "Revoke: ";
            var_dump($partitions);
            $kafka->assign(null);
            break;
        default:
            throw new Exception($err);
    }
});

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set("group.id", "group_1");

// Initial list of Kafka brokers
$conf->set("metadata.broker.list", "192.168.20.6:9092,192.168.20.6:9093");

$topicConf = new RdKafkaTopicConf();

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// "smallest": start from the beginning
$topicConf->set("auto.offset.reset", "smallest");

// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafkaKafkaConsumer($conf);

// Subscribe to topic "topic_1"
$consumer->subscribe(["topic_1"]);

echo "Waiting for partition assignment... (make take some time when
";
echo "quickly re-joining the group after leaving it.)
";

while (true) {
    $message = $consumer->consume(3e3);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            sleep(2);
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo $message->errstr() . PHP_EOL;
            break;
        default:
            throw new Exception($message->errstr(), $message->err);
            break;
    }
}
消费者-LowLevel

指定partition消费。
php consumer_lowlevel.php [partitonNuo]
LowLevel 没有消费组的概念,也可以认为每个消费者都属于一个独立消费组。

set("group.id", "group_2");

$rk = new RdKafkaConsumer($conf);
$rk->addBrokers("192.168.20.6:9092,192.168.20.6:9093");

$topicConf = new RdKafkaTopicConf();
$topicConf->set("auto.commit.interval.ms", 2000);

// Set the offset store method to "file"
// $topicConf->set("offset.store.method", "file");
// $topicConf->set("offset.store.path", sys_get_temp_dir());

// Alternatively, set the offset store method to "broker"
$topicConf->set("offset.store.method", "broker");

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// "smallest": start from the beginning
$topicConf->set("auto.offset.reset", "smallest");

$topic = $rk->newTopic($topic, $topicConf);

// Start consuming partition 0
$topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED);

while (true) {
    $message = $topic->consume($partition, 3 * 1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo $message->errstr() . PHP_EOL;
            break;
        default:
            throw new Exception($message->errstr(), $message->err);
            break;
    }
}

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/31031.html

相关文章

  • PHPkafka的实践

    摘要:消息以为类别记录将消息种子分类每一类的消息称之为一个主题。这意味着生产者不等待来自同步完成的确认继续发送下一条批消息。这意味着在已成功收到的数据并得到确认后发送下一条。三种机制,性能依次递减吞吐量降低,数据健壮性则依次递增。 kafka 简介 Kafka 是一种高吞吐量的分布式发布订阅消息系统 kafka角色必知 producer:生产者。 consumer:消费者。 topic: 消...

    Codeing_ls 评论0 收藏0
  • PHP 使用 Kafka 安装拾遗

    摘要:最近项目开发中需要使用消息队列。不过在环境中安装的过程中出现了以下报错开始以为是因为安装缺少了一些依赖。然后使用了源码编译的方式进行安装同样报错了。然后安装它再执行,执行。扩展包使用纯粹的编写的客户端,目前支持以上版本的。 最近项目开发中需要使用 Kafka 消息队列。经过检索,PHP下面有通用的两种方式来调用 Kafka 。 php-rdkafka 扩展 以 PHP 扩展的形式进行...

    SimonMa 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<