资讯专栏INFORMATION COLUMN

PHP下kafka的常用脚本实践

caiyongji / 1636人阅读

摘要:阅读本教程前最好先尝试阅读下的实践自带命令实践尝试实践的知识创建话题生产消息消费消息话题信息获取消费组获取消费组的自带的命令安装目录的目录下代表我们会使用的脚本

阅读本教程前最好先尝试阅读:PHP下kafka的实践

自带命令实践 尝试实践的kafka知识:

创建话题

生产消息

消费消息

话题信息

获取消费组

获取消费组的offset

自带的命令

</>复制代码

  1. # kafka安装目录的bin目录下
  2. # * 代表我们会使用的脚本
  3. connect-distributed.sh kafka-log-dirs.sh kafka-streams-application-reset.sh
  4. connect-standalone.sh kafka-mirror-maker.sh kafka-topics.sh*
  5. kafka-acls.sh kafka-preferred-replica-election.sh kafka-verifiable-consumer.sh
  6. kafka-broker-api-versions.sh kafka-producer-perf-test.sh kafka-verifiable-producer.sh
  7. kafka-configs.sh kafka-reassign-partitions.sh trogdor.sh
  8. kafka-console-consumer.sh* kafka-replay-log-producer.sh windows
  9. kafka-console-producer.sh* kafka-replica-verification.sh zookeeper-security-migration.sh
  10. kafka-consumer-groups.sh* kafka-run-class.sh zookeeper-server-start.sh
  11. kafka-consumer-perf-test.sh kafka-server-start.sh zookeeper-server-stop.sh
  12. kafka-delegation-tokens.sh kafka-server-stop.sh zookeeper-shell.sh*
  13. kafka-delete-records.sh kafka-simple-consumer-shell.sh
创建话题(kafka-topics.sh)

</>复制代码

  1. # 创建1个分区1个副本的test话题,这里是副本其实可以理解为broker里至少拥有数量,must >=1
  2. # --zookeeper localhost:2181kafka的默认端口:2181
  3. bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  4. # 创建2个分区1个副本的test02话题
  5. bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test02
  6. # 列出所有话题
  7. bin/kafka-topics.sh --list --zookeeper localhost:2181
  8. __consumer_offsets
  9. test
  10. test02
  11. # 注意这里的__consumer_offsets是kafka默认创建的,用于存储kafka消费记录的话题,我们暂时不用理会
  12. # 列出具体话题的信息
  13. bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
  14. Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
  15. Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
  16. bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test02
  17. Topic:test02 PartitionCount:2 ReplicationFactor:1 Configs:
  18. Topic: test02 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
  19. Topic: test02 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
  20. # 从上面的显示 我们可以发现,第一句是显示总体信息,下面缩进显示的是分区信息
消费者(kafka-console-consumer.sh)

</>复制代码

  1. # 启动一个消费组消费,这里我们需要开启一个shell终端,因为会等待输出
  2. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  3. # 等待输出
生产者(kafka-console-consumer.sh)

</>复制代码

  1. # 这里我们需要开启一个shell终端,因为会等待输入
  2. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  3. # 待我们输入消息 等待出现>
  4. > msg01
  5. > msg02
  6. > msg03
  7. #注意观察上面的消费者终端,自动输出了我们的消息
  8. msg01
  9. msg02
  10. msg03
查看消费组

</>复制代码

  1. bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
  2. Note: This will not show information about old Zookeeper-based consumers.
  3. console-consumer-25379
  4. console-consumer-73410
  5. console-consumer-27127
  6. console-consumer-61887
  7. console-consumer-61324
  8. # 这里我们再来起一个消费者再次输出(因为之前的消费者我们不知道最新的这次消费组id是多少)
  9. bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
  10. Note: This will not show information about old Zookeeper-based consumers.
  11. console-consumer-25379
  12. console-consumer-73410
  13. console-consumer-27127
  14. console-consumer-39416 # 这个是我们新起的消费组id,下面我们根据这个消费组来做实践
  15. console-consumer-61887
  16. console-consumer-61324
  17. # 查看消费组的具体信息
  18. bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-39416
  19. Note: This will not show information about old Zookeeper-based consumers.
  20. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  21. test 0 9 9 0 consumer-1-94afec29-5042-4108-8619-ba94812f10a8 /127.0.0.1 consumer-1
  22. # 查看离线的console-consumer-25379消费组
  23. bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-25379
  24. Note: This will not show information about old Zookeeper-based consumers.
  25. Consumer group "console-consumer-25379" has no active members.
  26. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  27. test 0 5 9 4 - - -
  28. # 查看离线的console-consumer-27127消费组
  29. bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-27127
  30. Note: This will not show information about old Zookeeper-based consumers.
  31. Consumer group "console-consumer-27127" has no active members.
  32. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  33. test 0 6 9 3 - - -
  34. # 这里我们发现我们每次都会生成一个消费组一个消费者,不方便我们做一个消费组多个消费者测试
启动一个消费组多个消费者

</>复制代码

  1. cp config/consumer.properties config/consumer_g1.properties
  2. vim config/consumer_g1.properties
  3. # 修改消费组名
  4. group.id=test-consumer-group => group.id=test-g1
  5. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config config/consumer_g1.properties
  6. msg01
  7. msg02
  8. msg03
  9. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config config/consumer_g1.properties
  10. # 无输出(因为test话题我们只有一个分区,一个分区只能被同个消费组下面的一个消费者消费,所以这个就闲置了)
  11. # 查看消费组
  12. bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
  13. Note: This will not show information about old Zookeeper-based consumers.
  14. console-consumer-25379
  15. console-consumer-73410
  16. console-consumer-27127
  17. console-consumer-39416
  18. test-g1
  19. console-consumer-61887
  20. console-consumer-61324
  21. # 查看test-g1消费组
  22. bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-g1
  23. Note: This will not show information about old Zookeeper-based consumers.
  24. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  25. test 0 9 9 0 consumer-1-43922b0c-34e0-47fe-b597-984c9e6a2884 /127.0.0.1 consumer-1
  26. # 下面我们再开启test02的2个消费组看看情况
  27. # 我们再为test02话题启动2个test-g1消费组的消费者
  28. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test02 --from-beginning --consumer.config config/consumer_g1.properties
  29. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test02 --from-beginning --consumer.config config/consumer_g1.properties
  30. # 查看test-g1消费组
  31. bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-g1
  32. Note: This will not show information about old Zookeeper-based consumers.
  33. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  34. test02 0 0 0 0 consumer-1-90ccc960-557a-46ab-a799-58c35ee670d8 /127.0.0.1 consumer-1
  35. test02 1 0 0 0 consumer-1-c6d79321-8212-4594-8a11-353f684c54fc /127.0.0.1 consumer-1
  36. test 0 9 9 0 consumer-1-7a2706f7-a206-4c29-ae1f-3726ad21af96 /127.0.0.1 consumer-1
  37. # 这里你可以在话题test产生一个消息看下,然后再test02再多产生几条消息看下,你会发现test02的2个消费组几乎是负载的消费了消息
消费组的一些信息命令

</>复制代码

  1. bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
  2. Note: This will not show information about old Zookeeper-based consumers.
  3. CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
  4. consumer-1-b8eeb8bc-6aa6-439b-84d8-0fcbed4a2899 /127.0.0.1 consumer-1 1 test02(1)
  5. consumer-1-7109f789-a5cf-4862-94d0-976146dbc769 /127.0.0.1 consumer-1 1 test(0)
  6. consumer-1-90ccc960-557a-46ab-a799-58c35ee670d8 /127.0.0.1 consumer-1 1 test02(0)
  7. bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state
  8. Note: This will not show information about old Zookeeper-based consumers.
  9. COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
  10. localhost:9092 (0) range Stable 3
  11. bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group console-consumer-25379
zookeeper(zookeeper-shell.sh) 连接

</>复制代码

  1. bin/zookeeper-shell.sh 127.0.0.1:2181
  2. Connecting to 127.0.0.1:2181
  3. Welcome to ZooKeeper!
  4. JLine support is disabled
  5. WATCHER::
  6. WatchedEvent state:SyncConnected type:None path:null
常用命令

</>复制代码

  1. ls /
  2. [cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/29291.html

相关文章

  • PHPkafka实践

    摘要:消息以为类别记录将消息种子分类每一类的消息称之为一个主题。这意味着生产者不等待来自同步完成的确认继续发送下一条批消息。这意味着在已成功收到的数据并得到确认后发送下一条。三种机制,性能依次递减吞吐量降低,数据健壮性则依次递增。 kafka 简介 Kafka 是一种高吞吐量的分布式发布订阅消息系统 kafka角色必知 producer:生产者。 consumer:消费者。 topic: 消...

    Codeing_ls 评论0 收藏0
  • 2018最新后端开发人员路线图

    摘要:简评之前,后端开发路线图仅仅是一个技术推荐,且没有明确的方向指明应该遵循的顺序,这份重新制作的指南将会给你一个更好的方向。现在开始创建一个包并分发给其他人使用,并确保遵循迄今为止学到的标准和最佳实践。 简评:之前,后端开发路线图仅仅是一个技术推荐,且没有明确的方向指明应该遵循的顺序,这份重新制作的指南将会给你一个更好的方向。 现在的 Web 开发与几年前完全不同了,有很多不同的东西可以...

    王陆宽 评论0 收藏0

发表评论

0条评论

caiyongji

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<