前言
主要关注 bootstrap.servers,它是必填参数。指的是 Kafka 集群中的 broker 地址,例如 127.0.0.1:9094。
其余几个参数暂时不做讨论,后文会有详细介绍。
接着注入这个 bean 即可调用它的发送函数发送消息。

这里我给某一个 Topic 发送了 10W 条数据,运行程序消息正常发送。
但这仅仅只是做到了消息发送,对消息是否成功送达完全没管,等于是纯异步的方式。
同步
那么我想知道消息到底发送成功没有该怎么办呢?
其实 Producer 的 API 已经帮我们考虑到了,发送之后只需要调用它的 get() 方法即可同步获取发送结果。

发送结果:

这样的发送效率其实是比较低下的,因为每次都需要同步等待消息发送的结果。
异步
为此我们应当采取异步的方式发送,其实 send() 方法默认则是异步的,只要不手动调用 get()方法。
但这样就没法获知发送结果。
所以查看 send() 的 API 可以发现还有一个参数。
Future<RecordMetadata> send(ProducerRecord<K, V> producer, Callback callback);Callback 是一个回调接口,在消息发送完成之后可以回调我们自定义的实现。

执行之后的结果:

同样的也能获取结果,同时发现回调的线程并不是上文同步时的主线程,这样也能证明是异步回调的。
同时回调的时候会传递两个参数:
RecordMetadata和上文一致的消息发送成功后的元数据。Exception消息发送过程中的异常信息。
但是这两个参数并不会同时都有数据,只有发送失败才会有异常信息,同时发送元数据为空。
所以正确的写法应当是:

至于为什么会只有参数一个有值,在下文的源码分析中会一一解释。
源码分析
现在只掌握了基本的消息发送,想要深刻的理解发送中的一些参数配置还是得源码说了算。
首先还是来谈谈消息发送时的整个流程是怎么样的,Kafka 并不是简单的把消息通过网络发送到了 broker 中,在 Java 内部还是经过了许多优化和设计。
发送流程
为了直观的了解发送的流程,简单的画了几个在发送过程中关键的步骤。

从上至下依次是:
- 初始化以及真正发送消息的
kafka-producer-network-threadIO 线程。 - 将消息序列化。
- 得到需要发送的分区。
- 写入内部的一个缓存区中。
- 初始化的 IO 线程不断的消费这个缓存来发送消息。
步骤解析
接下来详解每个步骤。
初始化

调用该构造方法进行初始化时,不止是简单的将基本参数写入 KafkaProducer。比较麻烦的是初始化 Sender 线程进行缓冲区消费。
初始化 IO 线程处:

可以看到 Sender 线程有需要成员变量,比如:
