2019獨角獸企業重金招聘Python工程師標準>>>
Kafka 入門和 Spring Boot 集成
概述
kafka 是一個高性能的消息隊列,也是一個分布式流處理平臺(這里的流指的是數據流)。由java 和 Scala 語言編寫,最早由 LinkedIn 開發,并 2011年開源,現在由 Apache 開發維護。
應用場景
下面列舉了一些kafka常見的應用場景。
消息隊列 : Kafka 可以作為消息隊列使用,可用于系統內異步解耦,流量削峰等場景。
應用監控:利用 Kafka 采集應用程序和服務器健康相關的指標,如應用程序相關的日志,服務器相關的 CPU、占用率、 IO、內存、連接數、 TPS、 QPS等,然后將指標信息進行處理,從而構建一個具有監控儀表盤、曲線圖等可視化監控系統。 例如, 很多公司采用 Kafka 與 ELK(ElasticSearch、 Logstash 和Kibana)整合構建應用服務的監控系統。
流處理:比如將 kafka 接收到的數據發送給 Storm 流式計算框架處理。
基本概念
record(消息):kafka 通信的基本單位,每一條消息稱為record
producer (生產者 ):發送消息的客戶端。
consumer(消費者 ):消費消息的客戶端。
consumerGroup (消費者組):每一個消費者都屬于一個特定的消費者組。
消費者和消費者組的關系:
- 如果a,b,c 屬于同一個消費者組,那一條消息只能被 a,b,c 中的某一個消費者消費。
- 如果a,b,c 屬于不同的消費者組(比如 ga,gb,gc) ,那一條消息過來,a,b,c 三個消費者都能消費到。
topic (主題): kafka的消息通過topic來分類,類似于數據庫的表。 producer 發布消息到 topic,consumer訂閱 topic 進行消費
partition( 分區):一個topic會被分成一到多個分區(partition),然后多個分區可以分布在不同的機器上,這樣一個主題就相當于運行在了多臺機子上,kafka用分區的方式提高了性能和吞吐量
replica (副本):一個分區有一到多個副本,副本的作用是提高分區的 可用性。
offset(偏移量):偏移量 類似數據庫自增int Id,隨著數據的不斷寫入 kafka 分區內的偏移量會不斷增加,一條消息由一個唯一的偏移量來標識。偏移量的作用是,讓消費者知道自己消費到了哪個位置,下次可以接著從這里消費。如下圖: 消費者A 消費到了 offset 為 9 的記錄,消費者 B 消費到了offset 為 11 的記錄。
基本結構
kafka 最基本的結構如下,跟常見的消息隊列結構一樣。 消息通過生產者發送到 kafka 集群, 然后消費者從 kafka 集群拉取消息進行消費。
和Spring Boot 集成
集成概述
本集成方式采用的是 spring boot 官方文檔說的集成方式,官方鏈接,集成的大體思路是,通過在 spring boot application.properties 中配置 生產者和消費者的基本信息,然后spring boot 啟動后會創建 KafkaTemplate 對象,這個對象可以用來發送消息到Kafka,然后用 @KafkaListener 注解來消費 kafka 里面的消息,具體步驟如下。
集成環境
spring boot:1.5.13 版本 spring-kafka:1.3.5 版本 kafka:1.0.1 版本
kafka 環境搭建
先啟動Zookeeper:
docker run -d --name zookeeper --publish 2181:2181 --volume /etc/localtime:/etc/localtime zookeeper:latest
再啟動Kafka:替換下面的IP為你服務器IP即可
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.10.253 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:1.0.1
Spring Boot 和 Spring for Apache Kafka 集成步驟
首先pom中引入 Spring for Apache Kafka<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
然后 application.properties 配置文件中加入如下配置: 各個配置的解釋見:spring boot 附錄中的 kafka 配置,搜索kafka 關鍵字即可定位。server.port=8090####### kafka### producer 配置
spring.kafka.producer.bootstrap-servers=192.168.10.48:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer### consumer 配置
spring.kafka.consumer.bootstrap-servers=192.168.10.48:9092
spring.kafka.consumer.group-id=anuoapp
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.max-poll-records=1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.concurrency=5
創建 Kafka Producer 生產者package com.example.anuoapp.kafka;import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;@Component
public class KafkaProducer {@AutowiredKafkaTemplate kafkaTemplate;public void kafkaSend() throws Exception {UserAccount userAccount=new UserAccount();userAccount.setCard_name("jk");userAccount.setAddress("cd");ListenableFuture send = kafkaTemplate.send("jktopic", "key", JSON.toJSONString(userAccount));}
}
創建 Kafka Consumer 消費者package com.example.anuoapp.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumer {public static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);@KafkaListener(topics = {"jktopic"})public void jktopic(ConsumerRecord consumerRecord) throws InterruptedException {System.out.println(consumerRecord.offset());System.out.println(consumerRecord.value().toString());Thread.sleep(3000);}}
創建一個rest api 來調用 Kafka 的消息生產者package com.example.anuoapp.controller;import com.example.anuoapp.kafka.KafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/api/system")
public class SystemController {private Logger logger = LoggerFactory.getLogger(SystemController.class);@AutowiredKafkaProducer kafkaProducer;@RequestMapping(value = "/Kafka/send", method = RequestMethod.GET)public void WarnInfo() throws Exception {int count=10;for (int i = 0; i < count; i++) {kafkaProducer.kafkaSend();}}}
用 post man 調用 第 5 步創建的接口, 就可以看到 如下消費者產生的輸出信息30
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}
31
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}
32
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}
最后
恭喜你 ! spring boot kafka 集成完畢。
完整的基礎源碼見: 鏈接: https://pan.baidu.com/s/1E2Lmbj9A9uruTXG54uPl_g 密碼: e6d6
踩過的坑
集成 spring kafka的時候注意版本要選對 ,目前spring boot 1.5.1 對應集成 spring kafka 1.x 版本 , 如果直接集成 spring kafka 2.x 版本 的話會報錯, 要用 spring kafka 2.x 的話, 需要升級spring boot 到 2.x 版本。
消費者配置的時候 spring.kafka.consumer.max-poll-records=1 ,輪詢拉取的最大記錄數要設置成1,不然會出現重復消費,即:如果消費者程序崩了,再啟動起來,會消費到以前消費過的數據,造成重復消費。 設置成 1 就沒得這個問題了, 不知道是不是 spring kafka 這個lib封裝得有問題。
轉載于:https://my.oschina.net/anuodog/blog/1824848
總結
以上是生活随笔為你收集整理的Kafka 入门和 Spring Boot 集成的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。