摘要:发送消息在上述示例中我们使用了接口传入并发送,在实际实现中该方法使用另一个接口并传入了回调函数。需要注意的是,如果拦截器抛出异常,程序不会停止,只会写入一个级别的日志。如果下一个拦截器依赖于上一个的结果,那么最终得到的数据可能不正确。
Kafka作为当前流行的消息中间件,在消息队列、微服务架构、大数据平台等方面有着广泛的应用。如果将平台比作人体,Kafka便是神经系统,负责传递消息。本系列利用碎片化的时间,阅读Kafka源码深入了解各个模块的原理和实现,不定期更新。文中所有代码均来自https://github.com/apache/kafka
Kafka Producer简单使用示例KafkaProducer用于将事件从客户端应用发送至Kafka集群。Producer本身是线程安全的,并且多个线程共享单个实例时也会有性能上的提升。以下示例来自org.apache.kafka.clients.producer.KafkaProducer类:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producerproducer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord ("my-topic", Integer.toString(i), Integer.toString(i))); producer.close();
props变量定义了Producer的属性和基本配置信息:
bootstrap.servers:kafka服务器地址,事件会发送至该服务器。在生产环境中通常定义多个服务器并用逗号分隔,以防止单个服务器突然崩溃。
acks:当事件发送至Kafka集群时,数据在集群内部会有主从备份,acks定义了何时可以判定消息发送成功。
acks = 0时,Producer在消息发送后不会等待服务器返回结果,立刻返回成功。
acks = 1时,消息在主(leader)服务器写入后返回成功,不会等待从(follower)服务器备份完成。
acks = all时,消息在主从服务器都写入成功后才告知Producer发送成功。
retries: 当发送失败时,producer自动重发的次数,并不是所有的错误都可以触发自动重发,并且自动重发可能导致消息发送顺序错乱,具体信息将在以后的章节介绍
key.serializer/value.serializer: 所有发送至kafka的数据都是以byte形式存在的,key/value serializer负责将Java实例转化为字节。
使用上述配置初始化proudcer后,我们可以构建ProducerRecord,这里使用topic,key,value构建消息并调用producer.send方法发送至kafka集群。在程序结束前务必调用producer.close方法,因为默认情况下producer会在内存中batch多个事件,并一起发送以增加性能,close方法会强制发送当前内存中未发送的事件。
发送消息在上述示例中我们使用了send接口传入并发送ProducerRecord,在实际实现中该方法使用另一个send接口并传入了null回调函数。Kafka发送消息是异步的,回调函数可以获得发送结果,若发送成功,回调函数可以得到消息的元数据包括topic,partition,offset等。若失败可获得错误信息。
/** * Asynchronously send a record to a topic. Equivalent tosend(record, null)
. * See {@link #send(ProducerRecord, Callback)} for details. */ @Override public Futuresend(ProducerRecord record) { return send(record, null); } /** * Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged. * * The send is asynchronous and this method will return immediately once the record has been stored in the buffer of * records waiting to be sent. This allows sending many records in parallel without blocking to wait for the response after each one. */ @Override public Future
send(ProducerRecord record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
一个回调函数的例子:
producer.send(myRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) { e.printStackTrace(); } else { System.out.println("The offset of the record we just sent is: " + metadata.offset()); } } });拦截器(ProducerInterceptor)
public interface ProducerInterceptorextends Configurable { /** * 消息发送前调用 */ public ProducerRecord onSend(ProducerRecord record); /** * 消息发送后,服务器返回结果(成功或错误)时调用 */ public void onAcknowledgement(RecordMetadata metadata, Exception exception); /** * 拦截器关闭时调用 */ public void close(); }
每一个Producer都可以设置一个或多个拦截器,拦截器允许客户端拦截或修改要发送的消息,通过Properties进行设置:
Properties props = new Properties(); ... props.put("interceptor.classes", "your.interceptor.class.name");
public class KafkaProducerimplements Producer { // ... other class members private final ProducerInterceptors interceptors; // Producer构造函数 KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer, Metadata metadata, KafkaClient kafkaClient) { // ...其他步骤省略 // 从config中获取拦截器实例,config从properties中构造 List > interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class); this.interceptors = new ProducerInterceptors<>(interceptorList); } }
拦截器设置完成后,在send方法中进行调用:
@Override public FutureProducerInterceptorssend(ProducerRecord record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
ProducerInterceptors是一个容器类,封装了多个拦截器,onSend方法被producer的send方法调用。
/** * A container that holds the list {@link org.apache.kafka.clients.producer.ProducerInterceptor} * and wraps calls to the chain of custom interceptors. */ public class ProducerInterceptorsimplements Closeable { private final List > interceptors; public ProducerInterceptors(List > interceptors) { this.interceptors = interceptors; } public ProducerRecord onSend(ProducerRecord record) { ProducerRecord interceptRecord = record; // 按顺序执行每一个拦截器的onSend方法 for (ProducerInterceptor interceptor : this.interceptors) { try { interceptRecord = interceptor.onSend(interceptRecord); } catch (Exception e) { // do not propagate interceptor exception, log and continue calling other interceptors // be careful not to throw exception from here if (record != null) log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e); else log.warn("Error executing interceptor onSend callback", e); } } return interceptRecord; } /** * 1. 当发送的消息被服务器接受并返回时调用 * 2. 当发送的消息未到达服务器之前就失败时调用(见下方onSendError方法) **/ public void onAcknowledgement(RecordMetadata metadata, Exception exception) { for (ProducerInterceptor interceptor : this.interceptors) { try { interceptor.onAcknowledgement(metadata, exception); } catch (Exception e) { // do not propagate interceptor exceptions, just log log.warn("Error executing interceptor onAcknowledgement callback", e); } } } /** * Producer在发送数据前要构建多种不同的信息,每一步都有可能抛出异常,本方法由producer在遇到异常时调用, * TopicPartition记录了topic和partition信息,由producer构建,但若异常发生在其构建之前,该参数为空,因此从record里提取topic和partition数据构建。 **/ public void onSendError(ProducerRecord record, TopicPartition interceptTopicPartition, Exception exception) { for (ProducerInterceptor interceptor : this.interceptors) { try { if (record == null && interceptTopicPartition == null) { interceptor.onAcknowledgement(null, exception); } else { if (interceptTopicPartition == null) { interceptTopicPartition = new TopicPartition(record.topic(), record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition()); } interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1, RecordBatch.NO_TIMESTAMP, Long.valueOf(-1L), -1, -1), exception); } } catch (Exception e) { // do not propagate interceptor exceptions, just log log.warn("Error executing interceptor onAcknowledgement callback", e); } } } }
需要注意的是,如果拦截器抛出异常,程序不会停止,只会写入一个warn级别的日志。并且拦截器链也不会停止执行,而是继续执行下一个拦截器。如果下一个拦截器依赖于上一个的结果,那么最终得到的数据可能不正确。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/68744.html
摘要:核心实现是这个方法通过不同的模式可以实现发送即忘忽略返回结果同步发送获取返回的对象,回调函数置为异步发送设置回调函数三种消息模式。 Kafka是一款很棒的消息系统,可以看看我之前写的 后端好书阅读与推荐来了解一下它的整体设计。今天我们就来深入了解一下它的实现细节(我fork了一份代码),首先关注Producer这一方。 要使用kafka首先要实例化一个KafkaProducer,需要有...
阅读 2628·2021-11-24 09:39
阅读 1623·2021-11-24 09:38
阅读 613·2021-11-22 14:44
阅读 1868·2021-11-18 10:02
阅读 2518·2021-11-18 10:02
阅读 1141·2021-10-14 09:43
阅读 4209·2021-09-29 09:35
阅读 473·2021-07-30 15:30