资讯专栏INFORMATION COLUMN

KAFKA集群部署实践

IT那活儿 / 1700人阅读
KAFKA集群部署实践

点击上方“IT那活儿”,关注后了解更多内容,不管IT什么活儿,干就完了!!!



基本概念



消费者(Consumer):向topic注册,并且接收发布到这些topic的消息

生产者(Producer):向kafka的topic发布消息的程序

服务端(broker):kafka以一个拥有一台或多台服务器的集群运行着,每一台服务器称为broker

1. Kafka架构

生产者、kafka集群、消费者架构图

kafka集群中的消息,以topic 的形式组织排序,如下图所示:

2. 基本概念

2.1 主题(Topic)

对消息进行分类单元,通常在一个应用中对应一个Topic,以此进行区分。一个主题就是一个类别或者一个可订阅的条目名称。对每个topic来说,kafka维护的是一个分区日志(partitioned log)

2.2 分区(Partition)

每个分区是一个有序的、不可变的消息序列,这个序列可以被连续地追加提交日志。在分区内的每条消息都有一个有序的id号,这个id号被称为偏移offset,这个偏移量可以唯一确定每条消息在分区内的位置。

 工作图



安装步骤


1. zookeeper安装

服务器三台:安装均在三台服务器上统一操作。

192.168.48.130

192.168.48.131

192.168.48.132

注意:

zk3.5.5之后的版本,选择带bin 的为二进制安装包;

注意jdk版本,3.5.5之后的一直用jdk1.8版本。

1.1 安装java环境

yum -y install java-1.8.0-openjdk*

1.2 下载zookeeper并创建对应目录

cd /opt

mkdir zookeeper

mkdir -p zookeeper/zkdata #快照日志

mkdir -p zookeeper/ zkdatalog#事物日志


cd /opt/zookeeper/

wget https://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.5.9/apache-zookeeper-3.5.9-bin.tar.gz

tar -zxvf apache-zookeeper-3.5.9-bin.tar.gz


1.3 修改配置文件

#进入conf目录

cd /opt/zookeeper/ apache-zookeeper-3.5.9-bin/conf

cp zoo_sample.cfg zoo.cfg

#zoo_sample.cfg 官方的zookeeper样例文件。


配置文件:

# The number of milliseconds of each tick

tickTime=2000

#
 The number of ticks that the initial

#
 synchronization phase can take

initLimit=10

#
 The number of ticks that can pass between

#
 sending a request and getting an acknowledgement

syncLimit=5

#
 the directory where the snapshot is stored.

#
 do not use /tmp for storage, /tmp here is just

#
 example sakes.

dataDir=/opt/zookeeper/zkdata

dataLogDir=/opt/zookeeper/zkdatalog


#
 the port at which the clients will connect

clientPort=12181

#
 the maximum number of client connections.

#
 increase this if you need to handle more clients

#
maxClientCnxns=60

#


#
 Be sure to read the maintenance section of the

#
 administrator guide before turning on autopurge.

#


#
 http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance

#


#
 The number of snapshots to retain in dataDir

#
autopurge.snapRetainCount=3

#
 Purge task interval in hours

#
 Set to "0" to disable auto purge feature

#
autopurge.purgeInterval=1


server.1=192.168.48.130:12888:13888

server.2=192.168.48.131:12888:13888

server.3=192.168.48.132:12888:13888


只根据情况修改了datadir 、clientPort和添加server配置。

#server.1 1为服务器标识,主要用于区分标识服务器,同样下面myid文件里也是根据这个数字命名。

#clientPort:客户端连接Zookeeper的端口。

1.4 创建myid文件

给zk集群服务器标识,整个zk集群用来发现彼此的一个重要标识。

#server1

echo "1" > /opt/zookeeper/zkdata/myid

#server2

echo "2" > /opt/zookeeper/zkdata/myid

#server3

echo "3" > /opt/zookeeper/zkdata/myid


1.5 启动服务

cd /opt/zookeeper/apache-zookeeper-3.5.9-bin/bin

./zkServer.sh start

./zkServer.sh status  #检查状态

[root@localhost apache-zookeeper-3.5.9-bin]# ./bin/zkServer.sh status

/usr/bin/java

ZooKeeper JMX enabled by default

Using config: /opt/zookeeper/apache-zookeeper-3.5.9-bin/bin/../conf/zoo.cfg

Client port found: 12181. Client address: localhost. Client SSL: false.

Mode: leader


#一般一个leader多个follower


2. kafka集群安装测试

2.1软件下载

#创建目录:

cd /opt/

mkdir kafka #创建项目目录

cd kafka

mkdir kafkalogs #kafka消息目录,主要存放kafka消息


#下载软件:

wget --no-check-certificate https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz


#解压软件:

tar -zxvf kafka_2.13-3.0.0.tgz


2.2 修改配置文件

cd /opt/kafka/kafka_2.13-3.0.0/config/


主要关注:server.properties 这个文件即可。

# Licensed to the Apache Software Foundation (ASF) under one or more

# contributor license agreements. See the NOTICE file distributed with

# this work for additional information regarding copyright ownership.

# The ASF licenses this file to You under the Apache License, Version 2.0

# (the "License"); you may not use this file except in compliance with

# the License. You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.


# see kafka.server.KafkaConfig for additional details and defaults


############################# Server Basics #############################


# The id of the broker. This must be set to a unique integer for each broker.

broker.id=1


############################# Socket Server Settings #############################


# The address the socket server listens on. It will get the value returned from

# java.net.InetAddress.getCanonicalHostName() if not configured.

# FORMAT:

# listeners = listener_name://host_name:port

# EXAMPLE:

# listeners = PLAINTEXT://your.host.name:9092

listeners=PLAINTEXT://192.168.48.131:19092


# Hostname and port the broker will advertise to producers and consumers. If not set,

# it uses the value for "listeners" if configured. Otherwise, it will use the value

# returned from java.net.InetAddress.getCanonicalHostName().

#advertised.listeners=PLAINTEXT://your.host.name:9092


# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details

#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL


# The number of threads that the server uses for receiving requests from the network and sending responses to the network

num.network.threads=3


# The number of threads that the server uses for processing requests, which may include disk I/O

num.io.threads=8


# The send buffer (SO_SNDBUF) used by the socket server

socket.send.buffer.bytes=102400


# The receive buffer (SO_RCVBUF) used by the socket server

socket.receive.buffer.bytes=102400


# The maximum size of a request that the socket server will accept (protection against OOM)

socket.request.max.bytes=104857600


############################# Log Basics #############################


# A comma separated list of directories under which to store log files

log.dirs=/opt/kafka/kafkalogs


# The default number of log partitions per topic. More partitions allow greater

# parallelism for consumption, but this will also result in more files across

# the brokers.

num.partitions=1


# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.

# This value is recommended to be increased for installations with data dirs located in RAID array.

num.recovery.threads.per.data.dir=1


############################# Internal Topic Settings #############################

# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"

# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1


############################# Log Flush Policy #############################


# Messages are immediately written to the filesystem but by default we only fsync() to sync

# the OS cache lazily. The following configurations control the flush of data to disk.

# There are a few important trade-offs here:

# 1. Durability: Unflushed data may be lost if you are not using replication.

# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.

# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.

# The settings below allow one to configure the flush policy to flush data after a period of time or

# every N messages (or both). This can be done globally and overridden on a per-topic basis.


# The number of messages to accept before forcing a flush of data to disk

#log.flush.interval.messages=10000


# The maximum amount of time a message can sit in a log before we force a flush

#log.flush.interval.ms=1000


############################# Log Retention Policy #############################


# The following configurations control the disposal of log segments. The policy can

# be set to delete segments after a period of time, or after a given size has accumulated.

# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens

# from the end of the log.


# The minimum age of a log file to be eligible for deletion due to age

log.retention.hours=48


# A size-based retention policy for logs. Segments are pruned from the log unless the remaining

# segments drop below log.retention.bytes. Functions independently of log.retention.hours.

#log.retention.bytes=1073741824


# The maximum size of a log segment file. When this size is reached a new log segment will be created.

log.segment.bytes=1073741824


# The interval at which log segments are checked to see if they can be deleted according

# to the retention policies

log.retention.check.interval.ms=300000


############################# Zookeeper #############################


# Zookeeper connection string (see zookeeper docs for details).

# This is a comma separated host:port pairs, each corresponding to a zk

# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".

# You can also append an optional chroot string to the urls to specify the

# root directory for all kafka znodes.

zookeeper.connect=192.168.48.130:12181,192.168.48.131:12181,192.168.48.132:12181


# Timeout in ms for connecting to zookeeper

zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################


# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.

# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.

# The default value for this is 3 seconds.

# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.

# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.

group.initial.rebalance.delay.ms=0


#broker.id=1  每台服务器的broker.id都不能相同。

#zookeeper.connect=192.168.48.130:12181,192.168.48.131:12181,192.168.48.132:12181

#
listeners=PLAINTEXT://192.168.48.131:19092

#
log.dirs=/opt/kafka/kafkalogs


其他按需配置。

2.3 启动集群

cd /opt/kafka/kafka_2.13-3.0.0

./bin/kafka-server-start.sh -daemon /opt/kafka/kafka_2.13-3.0.0/config/server.properties


检查是否启动成功。

#jps

3284 Kafka

97419 Jps

5775 QuorumPeerMain

#
ps -ef|grep kafka


2.4 测试发布、消费消息

##创建topic:

./bin/kafka-topics.sh --create --bootstrap-server 
192.168.48.131:19092,192.168.48.130:19092,192.168.48.132:19092 
--replication-factor 3 --partitions 1 --topic test


## 删除topic:

./bin/kafka-topics.sh --bootstrap-server 
192.168.48.131:19092,192.168.48.130:19092,192.168.48.132:19092 
--delete --topic test


## topic列表查询:

./bin/kafka-topics.sh --list --bootstrap-server 
192.168.48.131:19092,192.168.48.130:19092,192.168.48.132:19092


## 创建生产者:

./bin/kafka-console-producer.sh --broker-list 
192.168.48.131:19092,192.168.48.130:19092,192.168.48.132:19092 
--topic test


## 创建消费者:

./bin/kafka-console-consumer.sh --bootstrap-server 
192.168.48.131:19092,192.168.48.130:19092,192.168.48.132:19092 
--topic test  --from-beginning --consumer.config config/consumer.properties


##查看集群topic信息:

./bin/kafka-topics.sh --describe --bootstrap-server 
192.168.48.131:19092,192.168.48.130:19092,192.168.48.132:19092 
--topic test


3. 测试消费情况

--生产者:

--消费者:




本文作者:吴昊

本文来源:IT那活儿(上海新炬王翦团队)

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/129619.html

相关文章

  • TiDB 在摩拜单车的深度实践及应用

    摘要:本文会选择三个场景,给大家简单介绍一下在摩拜单车的使用姿势遇到的问题以及解决方案。图在线业务集群拓扑图四数据沙盒集群离线业务数据沙盒,属于离线业务集群,是摩拜单车的一个数据聚合集群。 作者介绍:吕磊,摩拜单车高级 DBA。 一、业务场景 摩拜单车 2017 年开始将 TiDB 尝试应用到实际业务当中,根据业务的不断发展,TiDB 版本快速迭代,我们将 TiDB 在摩拜单车的使用场景逐渐...

    Paul_King 评论0 收藏0
  • 使用canal+Kafka进行数据库同步实践

    摘要:比如,服务数据库的数据来源于服务的数据库服务的数据有变更操作时,需要同步到服务中。第二种解决方案通过数据库的进行同步。并且,我们还用这套架构进行缓存失效的同步。目前这套同步架构正常运行中,后续有遇到问题再继续更新。在微服务拆分的架构中,各服务拥有自己的数据库,所以常常会遇到服务之间数据通信的问题。比如,B服务数据库的数据来源于A服务的数据库;A服务的数据有变更操作时,需要同步到B服务中。第一...

    Tecode 评论0 收藏0

发表评论

0条评论

IT那活儿

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<