资讯专栏INFORMATION COLUMN

python3连接kafka模块pykafka生产者简单封装

fizz / 2245人阅读

摘要:安装模块基本使用生产者简单封装初始化实例连接地址设置获取当前所有获取当前生产者对象发送数据需要传入的可迭代对象连接切换设置新的获取当前设置的获取所有要发送的可迭代对象引用来源博客园测试集群知乎使用生成器把写入效率提高倍

1.1安装模块
pip install pykafka
1.2基本使用
  # -* coding:utf8 *-  
  from pykafka import KafkaClient  
  host = "IP:9092, IP:9092, IP:9092"
  client = KafkaClient(hosts = host)  
  # 生产者  
  topicdocu = client.topics["my-topic"]  
  producer = topicdocu.get_producer()  
  for i in range(100):  
      print i  
      producer.produce("test message " + str(i ** 2))  
  producer.stop()
1.3简单封装
  class KafkaProduct():

      def __init__(self,hosts,topic):
          """
          初始化实例
          :param hosts: 连接地址
          :param topic:
          """
          self.__client = KafkaClient(hosts=hosts)
          self.__topic = self.__client.topics[topic.encode()]
  
      def __set_topic(self, topic):
          self.__topic = self.__client.topics[topic.encode()]
  
      def set_topic(self, topic):
          """
          设置topic
          :param topic:
          :return:
          """
          self.__set_topic(topic)
  
      def get_topics(self):
          """
          获取当前所有topic
          :return:
          """
          return self.__client.topics
  
      def get_topic(self):
          """
          获取当前topic
          :return:
          """
          return self.__topic
  
      def Producer(self):
          """
          生产者对象
          :return:
          """
          with self.__topic.get_producer(delivery_reports=True) as producer:
              next_data = ""
              while True:
                  if next_data:
                      producer.produce(str(next_data).encode())
                  next_data = yield True
  
      def send_data(self,datas):
          """
          发送数据
          :param datas:需要传入的可迭代对象
          :return:
          """
          c = self.Producer()
          next(c)
          for i in datas:
              c.send(i)

if __name__ == "__main__":

    hosts = "1.2.3.4:9999,2.3.4.5:9090" #连接hosts
    topic = "test_523"
    K = KafkaProduct(hosts=hosts, topic=topic)  #
    #K.set_topic("test")  #切换设置新的topic
    K.get_topic()  #获取当前设置的topic
    #K.get_topics() #获取所有topic
    data = range(10000)  #要发送的可迭代对象
    K.send_data(data)
1.4引用来源

博客园:Python测试Kafka集群(pykafka)

知乎:使用生成器把Kafka写入效率提高1000倍

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/43910.html

相关文章

  • FAQs Kafka消息队列 UKafka

    摘要:大量的和分区会严重影响集群性能。介绍可参考收到离线分区总数异常告警一般是某个节点宕机或者服务异常导致。若服务卡住,可在评估后在控制台重启该节点服务。若想了解当前请求延时情况,建议关注平均请求延时监控项。 FAQs本篇目录一个UKafka集群可以创建多少个Topic?如何增加Topic的副本数量(ReplicationFactor)?收到离线分区总数>=10.0个告警,离线分区总数是什么,怎么...

    ernest.wang 评论0 收藏2407
  • Kafka学习笔记之扫盲

    摘要:相关概念协议高级消息队列协议是一个标准开放的应用层的消息中间件协议。可以用命令与不同,不是线程安全的。手动提交执行相关逻辑提交注意点将写成单例模式,有助于减少端占用的资源。自身是线程安全的类,只要封装得当就能最恰当的发挥好的作用。 本文使用的Kafka版本0.11 先思考些问题: 我想分析一下用户行为(pageviews),以便我能设计出更好的广告位 我想对用户的搜索关键词进行统计,...

    GT 评论0 收藏0
  • Kafka】《Kafka权威指南》入门

    摘要:主题和分区的悄息通过主题进行分类。在给定的分区里,每个悄息的偏移量都是唯一的。消费者把每个分区最后读取的悄息偏移量保存在或上,如果悄费者关闭或重启,它的读取状态不会丢失。主题可以配置自己的保留策略,可以将悄息保留到不再使用它们为止。发布与订阅消息系统 在正式讨论Apache Kafka (以下简称Kafka)之前,先来了解发布与订阅消息系统的概念, 并认识这个系统的重要性。数据(消息)的发送...

    番茄西红柿 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<