目录
实例代码:
1 package com.ys.utils; 2 3 import org.apache.kafka.clients.producer.*; 4 import java.util.Properties; 5 6 /** 7 * Create by YSOcean 8 */ 9 public class KafkaProducerUtils { 10 11 public static void main(String[] args) { 12 Properties kafkaProperties = new Properties(); 13 //配置broker地址信息14 kafkaProperties.put("bootstrap.servers", "192.168.146.200:9092,192.168.146.201:9092,192.168.146.202:9092"); 15 //配置 key 的序列化器16 kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 17 //配置 value 的序列化器18 kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 19 20 //通过上面的配置文件生成 Producer 对象21 Producer producer = new KafkaProducer(kafkaProperties); 22 //生成 ProducerRecord 对象,并制定 Topic,key 以及 value23 ProducerRecord<String,String> record =24 new ProducerRecord<String, String>("testTopic","key1","hello Producer"); 25 //发送消息26 producer.send(record); 27 } 28 }
通过运行上述代码,我们向名为 testTopic 的主题中发送了一条键为 key1,值为 hello Producer 的消息。

其中接口 serialization:
View Code②、自定义序列化器
如果Kafka提供的几个默认序列化器不能满足要求,即发送到 Kafka 的消息不是简单的字符串或整型,那么我们可以自定义序列化器。
比如对于如下的实体类 Person:

