资讯专栏INFORMATION COLUMN

慕课网_《Kafka流处理平台》学习总结

Maxiye / 2286人阅读

摘要:慕课网流处理平台学习总结时间年月日星期日说明本文部分内容均来自慕课网。

慕课网《Kafka流处理平台》学习总结

时间:2018年09月09日星期日

说明:本文部分内容均来自慕课网。@慕课网:https://www.imooc.com

教学源码:无

学习源码:https://github.com/zccodere/s...

第一章:课程介绍 1-1 课程介绍

课程介绍

Kafka概念解析

Kafka结构设计

Kafka场景应用

Kafka高级特性

第二章:概念解析 2-1 发展背景

LinkedIn 开源

Databus 分布式数据同步系统

Cubert 高性能计算引擎

ParSeq Java异步处理框架

Kafka 分布式发布订阅消息系统,流处理平台

Kafka发展历程

LinkedIn 开发

2011年初开源,加入Apache基金会

2012年从Apache Incubator毕业

Apache顶级开源项目

Kafka的特性

可以发布和订阅且记录数据的流,类似于消息队列

数据流存储的平台,具备容错能力

在数据产生时就可以进行处理

Kafka通常被用于

构建实时数据流管道

构建实时数据流处理

Kafka是什么

面向于数据流的生产、转换、存储、消费整体的流处理平台

Kafka不仅仅是一个消息队列

2-2 基本概念

Producer:数据生产者

消息和数据的生产者

向Kafka的一个topic发布消息的进程或代码或服务

Consumer:数据消费者

消息和数据的消费者

向Kafka订阅数据(topic)并且处理其发布的消息的进程或代码或服务

Consumer Group:消费者组

对于同一个topic,会广播给不同的Group

一个Group中,只有一个Consumer可以消费该消息

Broker:服务节点

Kafka集群中的每个Kafka节点

Topic:主题

Kafka消息的类别

对数据进行区分、隔离

Partition:分区

Kafka中数据存储的基本单元

一个topic数据,会被分散存储到多个Partition

一个Partition只会存在一个Broker上

每个Partition是有序的

Replication:分区的副本

同一个Partition可能会有多个Replication

多个Replication之间数据是一样的

Replication Leader:副本的老大

一个Partition的多个Replication上

需要一个Leader负责该Partition上与Producer和Consumer交互

Replication Manager:副本的管理者

负责管理当前Broker所有分区和副本的信息

处理KafkaController发起的一些请求

副本状态的切换

添加、读取消息等

2-3 概念延伸

Partition:分区

每一个Topic被切分为多个Partition

消费者数目少于或等于Partition的数目

Broker Group中的每一个Broker保存Topic的一个或多个Partition

Consumer Group中的仅有一个Consumer读取Topic的一个或多个Partition,并且是惟一的Consumer

Replication:分区的副本

当集群中有Broker挂掉的情况,系统可以主动地使Replication提供服务

系统默认设置每一个Topic的Replication系数为1,可以在创建Topic时多带带设置

Replication的基本单位是Topic的Partition

所有的读和写都从Replication Leader进行,Replication Followers只是作为备份

Replication Followers必须能够及时复制Replication Leader的数据

增加容错性与可扩展性

第三章:结构设计 3-1 基本结构

Kafka功能结构

Kafka数据流势

Kafka消息结构

Offset:当前消息所处于的偏移

Length:消息的长度

CRC32:校验字段,用于校验当前信息的完整性

Magic:很多分布式系统都会设计该字段,固定的数字,用于快速判定当前信息是否为Kafka消息

attributes:可选字段,消息的属性

Timestamp:时间戳

Key Length:Key的长度

Key:Key

Value Length:Value的长度

Value:Value

3-2 功能特点

Kafka特点:分布式

多分区

多副本

多订阅者

基于Zookeeper调度

Kafka特点:高性能

高吞吐量

低延迟

高并发

时间复杂度为O(1)

Kafka特点:持久性与扩展性

数据可持久化

容错性

支持在线水平扩展

消息自动平衡

第四章:场景应用 4-1 应用场景

Kafka应用场景

消息队列

行为跟踪

元信息监控

日志收集

流处理

事件源

持久性日志(commit log)

4-2 应用案例

Kafka简单案例

部署启动

简单生产者

简单消费者

学习笔记

1.下载与安装
Zookeeper下载:https://zookeeper.apache.org/releases.html#download
Kafka下载:http://kafka.apache.org/downloads
安装:解压、配置环境变量

2.Zookeeper启动
解压:tar -zxf zookeeper-3.4.12.tar.gz
目录:cd zookeeper-3.4.12/bin
启动:./zkServer.sh start /home/zc/server/kafka_2.12-2.0.0/config/zookeeper.properties

3.Kafka启动
解压:tar -zxf kafka_2.12-2.0.0.tgz
目录:cd kafka_2.12-2.0.0
启动:sudo bin/kafka-server-start.sh  config/server.properties

4.使用控制台操作生产者与消费者
创建Topic:sudo ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic myimooc-kafka-topic
查看Topic:sudo ./bin/kafka-topics.sh --list --zookeeper localhost:2181
启动生产者:sudo ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myimooc-kafka-topic
启动消费者:sudo ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myimooc-kafka-topic --from-beginning
生产消息:first message
生产消息:second message
4-3 代码案例

创建49-kafka-example的maven工程pom如下



    
        49-kafka
        com.myimooc
        1.0-SNAPSHOT
    
    4.0.0

    49-kafka-example

    
        2.0.4.RELEASE
    

    
        
            
                org.springframework.boot
                spring-boot-parent
                ${spring.boot.version}
                pom
                import
            
        
    

    
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.kafka
            spring-kafka
        
        
            com.alibaba
            fastjson
            1.2.36
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    

1.编写MessageEntity

package com.myimooc.kafka.example.common;

import java.util.Objects;

/**
 * 
* 标题: 消息实体
* 描述: 消息实体
* 时间: 2018/09/09
* * @author zc */ public class MessageEntity { /** * 标题 */ private String title; /** * 内容 */ private String body; @Override public String toString() { return "MessageEntity{" + "title="" + title + """ + ", body="" + body + """ + "}"; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } MessageEntity that = (MessageEntity) o; return Objects.equals(title, that.title) && Objects.equals(body, that.body); } @Override public int hashCode() { return Objects.hash(title, body); } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } }

2.编写SimpleProducer

package com.myimooc.kafka.example.producer;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * 
* 标题: 生产者
* 描述: 生产者
* 时间: 2018/09/09
* * @author zc */ @Component public class SimpleProducer { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private KafkaTemplate kafkaTemplate; public void send(String topic, String key, Object entity) { logger.info("发送消息入参:{}", entity); ProducerRecord record = new ProducerRecord<>( topic, key, JSON.toJSONString(entity) ); long startTime = System.currentTimeMillis(); ListenableFuture> future = this.kafkaTemplate.send(record); future.addCallback(new ListenableFutureCallback>() { @Override public void onFailure(Throwable ex) { logger.error("消息发送失败:{}", ex); } @Override public void onSuccess(SendResult result) { long elapsedTime = System.currentTimeMillis() - startTime; RecordMetadata metadata = result.getRecordMetadata(); StringBuilder record = new StringBuilder(128); record.append("message(") .append("key = ").append(key).append(",") .append("message = ").append(entity).append(")") .append("send to partition(").append(metadata.partition()).append(")") .append("with offset(").append(metadata.offset()).append(")") .append("in ").append(elapsedTime).append(" ms"); logger.info("消息发送成功:{}", record.toString()); } }); } }

3.编写SimpleConsumer

package com.myimooc.kafka.example.consumer;

import com.alibaba.fastjson.JSONObject;
import com.myimooc.kafka.example.common.MessageEntity;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * 
* 标题: 消费者
* 描述: 消费者
* 时间: 2018/09/09
* * @author zc */ @Component public class SimpleConsumer { private Logger logger = LoggerFactory.getLogger(getClass()); @KafkaListener(topics = "${kafka.topic.default}") public void listen(ConsumerRecord record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { //判断是否NULL Optional kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { //获取消息 Object message = kafkaMessage.get(); MessageEntity messageEntity = JSONObject.parseObject(message.toString(), MessageEntity.class); logger.info("接收消息Topic:{}", topic); logger.info("接收消息Record:{}", record); logger.info("接收消息Message:{}", messageEntity); } } }

4.编写Response

package com.myimooc.kafka.example.common;

import java.io.Serializable;

/**
 * 
* 标题: REST请求统一响应对象
* 描述: REST请求统一响应对象
* 时间: 2018/09/09
* * @author zc */ public class Response implements Serializable { private static final long serialVersionUID = -972246069648445912L; /** * 响应编码 */ private int code; /** * 响应消息 */ private String message; public Response() { } public Response(int code, String message) { this.code = code; this.message = message; } @Override public String toString() { return "Response{" + "code=" + code + ", message="" + message + """ + "}"; } public int getCode() { return code; } public void setCode(int code) { this.code = code; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }

5.编写ErrorCode

package com.myimooc.kafka.example.common;

/**
 * 
* 标题: 错误编码
* 描述: 错误编码
* 时间: 2018/09/09
* * @author zc */ public class ErrorCode { /** * 成功 */ public final static int SUCCESS = 200; /** * 失败 */ public final static int EXCEPTION = 500; }

6.编写ProducerController

package com.myimooc.kafka.example.controller;

import com.alibaba.fastjson.JSON;
import com.myimooc.kafka.example.common.ErrorCode;
import com.myimooc.kafka.example.common.MessageEntity;
import com.myimooc.kafka.example.common.Response;
import com.myimooc.kafka.example.producer.SimpleProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;

/**
 * 
* 标题: 生产者Controller
* 描述: 生产者Controller
* 时间: 2018/09/09
* * @author zc */ @RestController @RequestMapping("/producer") public class ProducerController { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private SimpleProducer simpleProducer; @Value("${kafka.topic.default}") private String topic; private static final String KEY = "key"; @PostMapping("/send") public Response sendKafka(@RequestBody MessageEntity message) { try { logger.info("kafka的消息:{}", JSON.toJSONString(message)); this.simpleProducer.send(topic, KEY, message); logger.info("kafka消息发送成功!"); return new Response(ErrorCode.SUCCESS,"kafka消息发送成功"); } catch (Exception ex) { logger.error("kafka消息发送失败:", ex); return new Response(ErrorCode.EXCEPTION,"kafka消息发送失败"); } } }

7.编写application.properties

##----------kafka配置
## TOPIC
kafka.topic.default=myimooc-kafka-topic
# kafka地址
spring.kafka.bootstrap-servers=192.168.0.105:9092
# 生产者配置
spring.kafka.producer.retries=0
# 批量发送消息的数量
spring.kafka.producer.batch-size=4096
# 缓存容量
spring.kafka.producer.buffer-memory=40960
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 消费者配置
spring.kafka.consumer.group-id=myimooc
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 指定listener 容器中的线程数,用于提高并发量
spring.kafka.listener.concurrency=3

8.编写ExampleApplication

package com.myimooc.kafka.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;

/**
 * 
* 标题: 启动类
* 描述: 启动类
* 时间: 2018/09/09
* * @author zc */ @SpringBootApplication @EnableKafka public class ExampleApplication { public static void main(String[] args) { SpringApplication.run(ExampleApplication.class, args); } }
第五章:高级特性 5-1 消息事务

为什么要支持事务

满足“读取-处理-写入”模式

流处理需求的不断增强

不准确的数据处理的容忍度不断降低

数据传输的事务定义

最多一次:消息不会被重复发送,最多被传输一次,但也有可能一次不传输

最少一次:消息不会被漏发送,最少被传输一次,但也有可能被重复传输

精确的一次(Exactly once):不会漏传输也不会重复传输,每个消息都被传输一次且仅仅被传输一次,这是大家所期望的

事务保证

内部重试问题:Procedure幂等处理

多分区原子写入

避免僵尸实例

每个事务Procedure分配一个 transactionl. id,在进程重新启动时能够识别相同的Procedure实例
Kafka增加了一个与transactionl.id相关的epoch,存储每个transactionl.id内部元数据
一旦epoch被触发,任务具有相同的transactionl.id和更旧的epoch的Producer被视为僵尸,Kafka会拒绝来自这些Producer的后续事务性写入

5-2 零拷贝

零拷贝简介

网络传输持久性日志块

Java Nio channel.transforTo()方法

Linux sendfile系统调用

文件传输到网络的公共数据路径

第一次拷贝:操作系统将数据从磁盘读入到内核空间的页缓存

第二次拷贝:应用程序将数据从内核空间读入到用户空间缓存中

第三次拷贝:应用程序将数据写回到内核空间到socket缓存中

第四次拷贝:操作系统将数据从socket缓冲区复制到网卡缓冲区,以便将数据经网络发出

零拷贝过程(指内核空间和用户空间的交互拷贝次数为零)

第一次拷贝:操作系统将数据从磁盘读入到内核空间的页缓存

将数据的位置和长度的信息的描述符增加至内核空间(socket缓存区)

第二次拷贝:操作系统将数据从内核拷贝到网卡缓冲区,以便将数据经网络发出

文件传输到网络的公共数据路径演变

第六章:课程总结 6-1 课程总结

课程总结

Kafka基础概念与结构

Kafka的特点

Kafka应用场景

Kafka应用案例

Kafka高级特性

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/77032.html

相关文章

  • 课网_《Java消息中间件》学习总结

    摘要:时间年月日星期六说明本文部分内容均来自慕课网。这个时候,可以启动多台积分系统,来同时消费这个消息中间件里面的登录消息,达到横向扩展的作用。 时间:2017年07月22日星期六说明:本文部分内容均来自慕课网。@慕课网:http://www.imooc.com教学源码:无学习源码:https://github.com/zccodere/s... 第一章:课程介绍 1-1 课程安排 Java...

    twohappy 评论0 收藏0
  • 课网_《微服务架构在二手交易平台(转转)中的实践》学习总结

    时间:2017年07月06日星期四说明:本文部分内容均来自慕课网。@慕课网:http://www.imooc.com教学示例源码:无学习学习源码:无 第一章:微服务架构在二手交易平台(转转)中的实践 1-1 微服务架构特点 分享要点-微服务架构 特点 使用原因 演进 通信协议、服务注册与发现 柔性可用实践 服务治理 什么是微服务 微服务是一系列小服务的组合 微服务可以单独运行,独立的进程 微服务整...

    ckllj 评论0 收藏0
  • 课网_《Java实现Base64加密》学习总结

    摘要:时间年月日星期一说明本文部分内容均来自慕课网。多用于网络加密。散列函数函数或消息摘要函数主要作用散列函数用来验证数据的完整性。 时间:2017年4月10日星期一说明:本文部分内容均来自慕课网。@慕课网:http://www.imooc.com教学示例源码:https://github.com/zccodere/s...个人学习源码:https://github.com/zccodere...

    verano 评论0 收藏0
  • 课网_《RxJava与RxAndroid基础入门》学习总结

    时间:2017年10月16日星期一说明:本文部分内容均来自慕课网。@慕课网:http://www.imooc.com教学源码:无学习源码:https://github.com/zccodere/s... 第一章:课程简介 1-1 课程介绍 本门课程的主要内容 RxJava是什么 RxAndroid是什么 RxJava常用操作符(重点、难点) 怎样在项目中使用RxJava和RxAndroid 如何学...

    刘明 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<