摘要:介绍是一个分布式的流数据平台,可发布订阅消息流,使用进行集群管理。启动一个,拉取消息参数表示从头开始读取数据,如果不设置,则只读取最新的数据。消息发布者,方式,负责发布消息到。表明消息,被同一内的均分了。
介绍
Kafka是一个分布式的流数据平台,可发布、订阅消息流,使用zookeeper进行集群管理。也可作为一个消息队列中间件,类似于RabbitMQ,ActiveMQ,ZeroMQ等。由LinkdIn开源,用Scala语言实现。
Kafka有如下特点:
kafka利用线性存储来进行硬盘读写,速度快;
以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。因此不清除存储的数据并不会影响性能;
zero-copy Gzip和Snappy压缩
已消费的消息不会自动删除
考虑到高效性,对事务的支持较弱。
应用场景 安装使用// 从官网下载最新版本,这里为:kafka_2.11-1.0.0.tgz
// 解压
$ tar -xzf kafka_2.11-1.0.0.tgz $ cd kafka_2.11-1.0.0
// Kafka用到了zookeeper,所以需要启动zookeeper(新版本内置了zookeeper,如果读者已有其他zookeeper启动了,这步可以略过)
$ bin/zookeeper-server-start.sh config/zookeeper.properties
// 修改配置文件,并启动kafka server:
config/server.properties中的zookeeper.connect默认为localhost:2181,可以修改为其他的zookeeper地址。多个地址间,通过逗号分隔,如:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"。
默认为9092端口,通过修改“listeners=PLAINTEXT://:9092” 来指定其他端口或IP。
配置好后,启动kafka server:
$ bin/kafka-server-start.sh config/server.properties
配置文件目录下还有consumer.properties和producer.properties,按默认即可。
// 创建一个topic,topic名称为test
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
可以通过命令:bin/kafka-topics.sh --list --zookeeper localhost:2181查看当前所有的topic.
// 通过producer发送消息
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
往test中发送数据。
// 启动一个consumer,拉取消息
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
--from-beginning参数表示从头开始读取数据,如果不设置,则只读取最新的数据。
// 可以再启动一个Server
$ bin/kafka-server-start.sh config/server-1.properties &
这里的server-1.properties是拷贝的server.properties,主要修改如下几个参数:
# broker id,整数,和其他broker不能重复 broker.id=2 # 指定端口为9094,因为在同一台机器上,需要避免端口冲突。这里没有配置IP,默认为本机 listeners=PLAINTEXT://:9094 # 日志文件路径,即topic数据的存储位置。不同的broker,指定不同的路径。 log.dir=/tmp/kafka-logs-2示意图
Producer1和Producer2往Topic A中发送消息,Consumer1/2/3/4/5 从Topic中接收消息。
Kafka Cluster包含两个Server,分别为Server1,Server2。
Topic A包含4个Partition,为:P0, P1, P3, P4,平均分配到Server1和Server2上。
一个Broker就是一个server。多个Broker构成一个kafka集群,同时对外提供服务,如果某个节点down掉,则重新分配。
注意:集群和主从热备不同,对于主从热备,同时只有一个节点提供服务,其他节点待命状态。
消息发布者,Push方式,负责发布消息到Kafka broker。
Consumer消费者,Pull方式,消费消息。每个consumer属于一个特定的consuer group。
主题(Topic)通过对消息指定主题可以将消息分类,Consumer可以只关注特定Topic中的消息。
查看总共有多少个Topic:
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
查看某个topic的情况(分区、副本数等),这里查看topic为test的信息:
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test消费者组(Consumer Group)
每个Consumer都会归到一个Group中。某个Partition中的消息,可以被多个Group消费,但只能被Group中的一个Consumer消费。所以,如果要对多个Consumer进行消息广播,则这些Consumer需要放在不同的Group中。
当一个Consumer进程或线程挂掉,它所订阅的Partition会被重新分配到该Group内的其他Consumer上。如果Consumer A订阅了多个Partition,那么当该Group内新增Consumer B时,会从Consumer A中分配出一个Partition给Consumer B。
为了维持Consumer 与 Consumer Group的关系,需要Consumer周期性的发送heartbeat到coordinator(协调者,在早期版本,以zookeeper作为协调者。后期版本则以某个broker作为协调者)。当Consumer由于某种原因不能发Heartbeat到coordinator时,并且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同一group 内的其它的consumer上。而这个过程,被称为rebalance。
位移(Offset)Offset是针对Partition的,它用来记录消费到Partition中的哪条消息了。
Consumer并不维护Offset,而是由Consumer所在的Group维护。因此,Group中的一个Consumer消费了某个Partition中的消息,那么该组的其他Consumer就不能重复消费该条消息了,因为Offset已经+1了。
上图中,Consumer A和Consumer B属于不同的Group。Consumer A所在的Group,在该Partition的Offset=9,表示下次该Group获取消息时是从9开始获取;同理,Consumer B所在的Group在该Partition的Offset=11,下次该Group的Consumer获取消息时,从11开始获取。
分区(Partition)Partition是物理上的概念,每个Partition对应一个文件夹(默认在/tmp/kafka-logs下,通过server.properties中log.dirs配置)。一个topic可以对应多个partition,consumer订阅的其实就是partition。
上图表示一个Topic,指定了3个分区。在向该Topic写数据时,会根据均衡策略,往相应的分区中写。这3个分区中的数据是不一样的,它们的数据总和,构成该Topic的数据。
每个分区中的数据,保证严格的写入顺序。
分区会自动根据均衡策略分配到多个broker上。比如有2个broker(或者叫Server):broker1, broker2,创建一个包含4个partition且replication-factor(副本)为1的topic,那么对于该topic,每个broker会被分配2个partition。如下图:
有两个Group:Group A和Group B,其中Group A包含C1、C2两个Consumer;Group B包含C3,C4,C5,C6四个Consumer。
如果向该Topic写入4条信息:M1, M2, M3, M4。那么各个Consumer收到的消息是(一种情况):
C1:M1, M3 C2:M2, M4 C3:M1 C4:M3 C5:M2 C6:M4
C1,C2各接收到2条消息,它们的和为:M1,M2,M3,M4。
C3,C4,C5,C6各接收到1条消息,它们的和为:M1,M2,M3,M4。
表明Topic消息,被同一Group内的Consumer均分了。因为每次向Topic中写入消息时,会被均分至各个Partition,然后各Consumer收到自己所订阅Partition的消息。同时,这里也说明了同一个partition内的消息只能被同一个组中的一个consumer消费。
注:如果replication-factor为3,那么每个broker会有6(即2x3)个partition。
另外,创建topic时,在当前的所有broker间进行均分,分好后就不会变了。假设把上述broker1停掉,它的分区不会转到broker2上。producer在写消息时,不会再写入broker2中的分区。
那么,原先订阅broker2分区的consumer,不能接收消息了。提示:
WARN [Consumer clientId=consumer-1, groupId=g4] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
对于一个Topic的partition数,增加Broker(即服务节点)并不会增加partition的数量。
验证:
查看topic信息
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
再启动一个新的Broker:
$ bin/kafka-server-start.sh ../config/server-1.properties
启动后,再用上一步的命令看topic信息,partition数量并未改变。
并且,如果group g1上有两个consumer,始终只会有一个consumer能收到该topic的消息,另一个一直处于空闲状态(光占着资源不做事)。所以,Topic的Partition数,要大于等于Consumer数量。
默认组的疑问
可能读者会有疑问,通过命令:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
执行多次,创建了多个consumer,这些consumer属于默认的一个组,但是却能同时收到一个topic的信息。和上述所说的“一个Topic中的消息,只能被group中的一个consumer消费”有冲突。
其实,不指定group名称,的确会分配默认的group,但每次分配的名称是不一样的,即这里创建的consumer是属于不同的group的。可以通过命令查看所有group:
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list Note: This will not show information about old Zookeeper-based consumers. console-consumer-94070 console-consumer-27823 console-consumer-4826 console-consumer-47050
可以看出,这里的group名称是不一样的。
consumer数量和group数量
对于一个topic,如果group中consumer数量比partition数量多,那么多余的consumer会空闲。这是因为,group中的某个consumer一旦订阅了某个partition,则会一直占用并消费该partition中的信息。除非该consumer退出,否则该partition不会被该组的其他consumer占用。所以会导致多余的consumer空闲,一直收不到消息。
可以通过命令,查看consumer和partition的对应关系:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group g4
一个topic可以对应多个partition,但一个partition只能对应一个topic。
数据文件分段Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个多带带的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个段中。
上图展示文件传输到Socket的常规方式,步骤:
操作系统将文件数据从磁盘读入到内核空间的页缓存;
应用程序将数据从内核空间读入到用户空间缓存中;
应用程序将数据写回到内核空间到socket缓存中;
操作系统将数据从socket缓冲区复制到网卡缓冲区,以便将数据经网络发出。
上图展示零拷贝方式传输文件到Socket,少了文件缓存到用户空间,再由用户空间到内核空间的操作。
Kafka采用零拷贝的方式。
通过Replication Factor指定副本的数量,这样,如果一个Partition出现了问题,那么可以从副本中恢复了。
Kafka Manager安装和使用如果不喜欢通过命令行操作,也可以通过图形化管理界面,比如yahoo开源的Kafka Manager。
地址:https://github.com/yahoo/kafk...
这里以CentOS7为例,进行编译、运行说明。
注:Kafka Manager的编译需要javac,需要安装jdk环境。最新版的需要jdk8版本。
CentOS7默认安装了OpenJDK,将其卸载,从Oracle官网下载jdk8文件,然后安装。
// github上下载kafka manager源码
$ git clone https://github.com/yahoo/kafk...
$ cd kafka-manager
// 修改配置文件中zookeeper地址
配置文件:conf/application.conf
kafka-manager.zkhosts="127.0.0.1:2181"
如果有多个zookeeper,通过逗号分隔,如:
kafka-manager.zkhosts="my.zookeeper.host.com:2181,other.zookeeper.host.com:2181"
// 将源码编译打包成zip包
$ ./sbt clean dist
这一步用到了javac,运行完后,会在当前目录下生成target文件夹。生成zip包地址:
target/universal/kafka-manager-1.3.3.16.zip
// 进入zip所在目录,解压zip包,启动服务(默认9000端口)
$ cd target/universal
$ unzip kafka-manager-1.3.3.16
$ ./kafka-manager-1.3.3.16/bin/kafka-manager
// 打开Kafka Manager页面
浏览器输入地址:http://192.168.0.12:9000/ (这里的IP需要替换成读者自己的IP)
很简洁的一个页面,第一次打开,什么都没有。
// 添加一个Cluster
Cluster Name: 名称随意,比如MyFirstCluster
Cluster Zookeeper Hosts: zookeeper的地址,比如:192.168.0.12:2181
Kafka Version: 笔者选的0.11
勾选“Enable JMX Polling”。注意:勾选了该项,启动kafka server前,需要设置JMX_PORT变量,如:
$ JMX_PORT=9999 $ bin/zookeeper-server-start.sh config/zookeeper.properties
保存后,就可以通过MyFirstCluster,查看Broker, Topic, Partition, Consumer等信息了。
注:如果查看不了Consumer信息,提示“Please enable consumer polling here.”,需要勾选一个配置。如:
提示信息:
修改Cluster:
勾选中“Poll consumer information”
保存。具体的管理功能,可以通过操作页面进一步挖掘。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/70973.html
摘要:关闭套接字和上下文备注说明如何利用使用首先下载所需的包,解压以后将和文件放到自己电脑中的安装路径中的文件夹下,最后需要将之前解压后的包放在项目的中或者资源下载链接密码项目源码下载链接链接密码 在讲ZeroMQ前先给大家讲一下什么是消息队列。 消息队列简介: 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是...
阅读 1898·2021-11-15 11:46
阅读 1097·2021-10-26 09:49
阅读 1831·2021-10-14 09:42
阅读 3389·2021-09-26 09:55
阅读 841·2019-08-30 13:58
阅读 1041·2019-08-29 16:40
阅读 3477·2019-08-26 10:27
阅读 614·2019-08-23 18:18