SpringBoot实战(十四)之整合KafKa

本人今天上午参考了不少博文,发现不少博文不是特别好,不是因为依赖冲突问题就是因为版本问题。 于是我结合相关的博文和案例,自己改写了下并参考了下,于是就有了这篇文章。希望能够给大家帮助,少走一些弯路。 一、KafKa的介绍 1.主要功能 根据官网的介绍,ApacheKafka®是一个分布式流媒体平台,它主要有3种功能:   a.发布和订阅消息流,这个功能类似于消息队列,这也是kafka归类为消息队列框架的原因。   b.以容错的方式记录消息流,kafka以文件的方式来存储消息流。   c.可以再消息发布的时候进行处理。 2.使用场景 a.在系统或应用程序之间构建可靠的用于传输实时数据的管道,消息队列功能。 b.构建实时的流数据处理程序来变换或处理数据流,数据处理功能。 3.详细介绍 Kafka目前主要作为一个分布式的发布订阅式的消息系统使用,下面简单介绍一下kafka的基本机制 消息传输过程: Producer即生产者,向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic,上图展示了两个producer发送了分类为topic1的消息,另外一个发送了topic2的消息。 Topic即主题,通过对消息指定主题可以将消息分类,消费者可以只关注自己需要的Topic中的消息 Consumer即消费者,消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理。 二、安装 安装包下载地址:http://kafka.apache.org/downloads 找到0.11.0.1版本,如图: 1.下载 复制代码 wget https://archive.apache.org/dist/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz 复制代码 2.解压 复制代码 tar -xzvf kafka_2.11-0.11.0.1.tgz 复制代码 配置说明:   consumer.properites 消费者配置,这个配置文件用于配置开启的消费者,此处我们使用默认的即可。   producer.properties 生产者配置,这个配置文件用于配置开启的生产者,此处我们使用默认的即可。   server.properties kafka服务器的配置,此配置文件用来配置kafka服务器,目前仅介绍几个最基础的配置。 a.broker.id 申明当前kafka服务器在集群中的唯一ID,需配置为integer,并且集群中的每一个kafka服务器的id都应是唯一的,我们这里采用默认配置即可。 b.listeners 申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置, 例如:listeners=PLAINTEXT:// 192.168.126.143:9092。并确保服务器的9092端口能够访问。   c.zookeeper.connect 申明kafka所连接的zookeeper的地址 ,需配置为zookeeper的地址,由于本次使用的是kafka高版本中自带zookeeper, 使用默认配置即可,zookeeper.connect=localhost:2181。 3.运行 首先运行zookeeper 复制代码 bin/zookeeper-server-start.sh config/zookeeper.properties 复制代码 运行成功,显示如图: 然后运行kafka 复制代码 bin/kafka-server-start.sh config/server.properties 复制代码 运行成功,显示如图: 三、整合KafKa 1.新建Maven项目导入Maven依赖 复制代码 4.0.0 cn.test kafka_demo 0.0.1-SNAPSHOT org.springframework.boot spring-boot-starter-parent 1.5.9.RELEASE UTF-8 UTF-8 1.8 org.springframework.boot spring-boot-starter-web org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test org.springframework.kafka spring-kafka 1.1.1.RELEASE com.google.code.gson gson 2.8.2 org.springframework.boot spring-boot-maven-plugin org.apache.maven.plugins maven-compiler-plugin 1.8 1.8 ${project.artifactId} 复制代码 2.编写消息实体 复制代码 package com.springboot.kafka.bean; import java.util.Date; import lombok.Data; @Data public class Message { private Long id; //id private String msg; //消息 private Date sendTime; //时间戳 } 复制代码 有了lombok,每次编写实体不必要使用快捷键生成seter或geter方法了,代码看起来更加简洁了。 3.编写消息发送者(可以理解为生产者,最好联系详细介绍中的图) 复制代码 package com.springboot.kafka.producer; import java.util.Date; import java.util.UUID; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.springboot.kafka.bean.Message; import lombok.extern.slf4j.Slf4j; @Component @Slf4j public class KafkaSender { @Autowired private KafkaTemplate kafkaTemplate; private Gson gson = new GsonBuilder().create(); //发送消息方法 public void send() { Message message = new Message(); message.setId(System.currentTimeMillis()); message.setMsg(UUID.randomUUID().toString()); message.setSendTime(new Date()); log.info("+++++++++++++++++++++ message = {}", gson.toJson(message)); kafkaTemplate.send("zhisheng", gson.toJson(message)); } } 复制代码 4.编写消息接收者(可以理解为消费者) 复制代码 package com.springboot.kafka.producer; import java.util.Date; import java.util.UUID; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.springboot.kafka.bean.Message; import lombok.extern.slf4j.Slf4j; @Component @Slf4j public class KafkaSender { @Autowired private KafkaTemplate kafkaTemplate; private Gson gson = new GsonBuilder().create(); //发送消息方法 public void send() { Message message = new Message(); message.setId(System.currentTimeMillis()); message.setMsg(UUID.randomUUID().toString()); message.setSendTime(new Date()); log.info("+++++++++++++++++++++ message = {}", gson.toJson(message)); kafkaTemplate.send("zhisheng", gson.toJson(message)); } } 复制代码 5.编写启动类 复制代码 package com.springboot.kafka; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; import com.springboot.kafka.producer.KafkaSender; @SpringBootApplication public class KafkaApplication { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args); KafkaSender sender = context.getBean(KafkaSender.class); for (int i = 0; i < 3; i++) { //调用消息发送类中的消息发送方法 sender.send(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } } 复制代码 6.编写application.properties配置文件 复制代码 #============== kafka =================== # \u6307\u5B9Akafka \u4EE3\u7406\u5730\u5740\uFF0C\u53EF\u4EE5\u591A\u4E2A spring.kafka.bootstrap-servers=192.168.126.143:9092 #=============== provider ======================= spring.kafka.producer.retries=0 # \u6BCF\u6B21\u6279\u91CF\u53D1\u9001\u6D88\u606F\u7684\u6570\u91CF spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 # \u6307\u5B9A\u6D88\u606Fkey\u548C\u6D88\u606F\u4F53\u7684\u7F16\u89E3\u7801\u65B9\u5F0F spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== consumer ======================= # \u6307\u5B9A\u9ED8\u8BA4\u6D88\u8D39\u8005group id spring.kafka.consumer.group-id=test-consumer-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 # \u6307\u5B9A\u6D88\u606Fkey\u548C\u6D88\u606F\u4F53\u7684\u7F16\u89E3\u7801\u65B9\u5F0F spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer 复制代码 7.运行结果 示例代码地址:https://github.com/youcong1996/study_simple_demo/tree/kafka_demo 如果按照上述流程没有达到预计的效果可以git clone到本地。https://www.cnblogs.com/youcong/p/10216573.html
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信