kafka原理_kafka入门(原理搭建简单使用)
前言
???????公司在用kafka接受和發(fā)送數(shù)據(jù),自己學(xué)習(xí)過Rabbitmq,不懂kafka挺不爽的,說干就干!網(wǎng)上找了許多帖子,學(xué)習(xí)了很多,小小的demo自己也搭建起來了,美滋滋,下面我認(rèn)為優(yōu)秀的網(wǎng)站和自己的步驟展現(xiàn)給大家。
一、kafka介紹與原理
我們將消息的發(fā)布(publish)稱作 producer,將消息的訂閱(subscribe)表述為 consumer,將中間的存儲(chǔ)陣列稱作 broker(代理),這樣就可以大致描繪出這樣一個(gè)場(chǎng)面:
生產(chǎn)者將數(shù)據(jù)生產(chǎn)出來,交給 broker 進(jìn)行存儲(chǔ),消費(fèi)者需要消費(fèi)數(shù)據(jù)了,就從broker中去拿出數(shù)據(jù)來,然后完成一系列對(duì)數(shù)據(jù)的處理操作。
乍一看返也太簡(jiǎn)單了,不是說了它是分布式嗎,難道把 producer、 broker 和 consumer 放在三臺(tái)不同的機(jī)器上就算是分布式了嗎。看 kafka 官方給出的圖:
多個(gè) broker 協(xié)同合作,producer 和 consumer 部署在各個(gè)業(yè)務(wù)邏輯中被頻繁的調(diào)用,三者通過 zookeeper管理協(xié)調(diào)請(qǐng)求和轉(zhuǎn)發(fā)。這樣一個(gè)高性能的分布式消息發(fā)布訂閱系統(tǒng)就完成了。
圖上有個(gè)細(xì)節(jié)需要注意,producer 到 broker 的過程是 push,也就是有數(shù)據(jù)就推送到 broker,而 consumer 到 broker 的過程是 pull,是通過 consumer 主動(dòng)去拉數(shù)據(jù)的,而不是 broker 把數(shù)據(jù)主懂發(fā)送到 consumer 端的。
二、kafka的linux基本搭建
??????此處還是給大家一個(gè)鏈接很是優(yōu)秀!kafka的linux搭建
注意:kafka依賴于zookeeper的節(jié)點(diǎn),需要搭建zookeeper,linux安裝zookeeper文章中有鏈接(kafaka高版本也自帶zookeeper,其實(shí)博主用kafka自帶的zookeeper啟動(dòng),然后啟動(dòng)kafka沒成功,用安裝的zookeeper就成功了…)
三、springboot整合kafka的簡(jiǎn)單demo
1.引入依賴
在springboot項(xiàng)目中的pom.xml引入下列依賴:
<dependency> <groupId>org.springframework.kafkagroupId> <artifactId>spring-kafkaartifactId> <version>2.2.6.RELEASEversion> dependency> <dependency> <groupId>org.apache.kafkagroupId> <artifactId>kafka-clientsartifactId> <version>2.1.0version> dependency> <dependency> <groupId>org.projectlombokgroupId> <artifactId>lombokartifactId> <optional>trueoptional> dependency>2.yml配置文件
配置如下:
spring: kafka: bootstrap-servers: 192.168.200.130:9092 #此處是我虛擬機(jī)上linux的ip kafak的默認(rèn)端口為9092 producer: #生產(chǎn)者 acks: 1 client-id: kafka-producer batch-size: 5 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: #消費(fèi)者 group-id: hello-group enable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer application: name: kafkatestserver: port: 9192eureka: #eureka注冊(cè)中心配置 client: service-url: defaultZone: http://127.0.0.1:6868/eureka instance: prefer-ip-address: true?是不是依賴沒注釋看不懂ennnnn,別急嘛,那我來搬運(yùn)點(diǎn)詳細(xì)的注釋解釋(我的demo就是上面的)
???????????????????????????????????????????????producer
#============== kafka ===================# 指定kafka server的地址,集群配多個(gè),中間,逗號(hào)隔開spring.kafka.bootstrap-servers=127.0.0.1:9092#=============== provider =======================# 寫入失敗時(shí),重試次數(shù)。當(dāng)leader節(jié)點(diǎn)失效,一個(gè)repli節(jié)點(diǎn)會(huì)替代成為leader節(jié)點(diǎn),此時(shí)可能出現(xiàn)寫入失敗,# 當(dāng)retris為0時(shí),produce不會(huì)重復(fù)。retirs重發(fā),此時(shí)repli節(jié)點(diǎn)完全成為leader節(jié)點(diǎn),不會(huì)產(chǎn)生消息丟失。spring.kafka.producer.retries=0# 每次批量發(fā)送消息的數(shù)量,produce積累到一定數(shù)據(jù),一次發(fā)送spring.kafka.producer.batch-size=16384# produce積累數(shù)據(jù)一次發(fā)送,緩存大小達(dá)到buffer.memory就發(fā)送數(shù)據(jù)spring.kafka.producer.buffer-memory=33554432#procedure要求leader在考慮完成請(qǐng)求之前收到的確認(rèn)數(shù),用于控制發(fā)送記錄在服務(wù)端的持久化,其值可以為如下:#acks = 0 如果設(shè)置為零,則生產(chǎn)者將不會(huì)等待來自服務(wù)器的任何確認(rèn),該記錄將立即添加到套接字緩沖區(qū)并視為已發(fā)送。在這種情況下,無法保證服務(wù)器已收到記錄,并且重試配置將不會(huì)生效(因?yàn)榭蛻舳送ǔ2粫?huì)知道任何故障),為每條記錄返回的偏移量始終設(shè)置為-1。#acks = 1 這意味著leader會(huì)將記錄寫入其本地日志,但無需等待所有副本服務(wù)器的完全確認(rèn)即可做出回應(yīng),在這種情況下,如果leader在確認(rèn)記錄后立即失敗,但在將數(shù)據(jù)復(fù)制到所有的副本服務(wù)器之前,則記錄將會(huì)丟失。#acks = all 這意味著leader將等待完整的同步副本集以確認(rèn)記錄,這保證了只要至少一個(gè)同步副本服務(wù)器仍然存活,記錄就不會(huì)丟失,這是最強(qiáng)有力的保證,這相當(dāng)于acks = -1的設(shè)置。#可以設(shè)置的值為:all, -1, 0, 1spring.kafka.producer.acks=1# 指定消息key和消息體的編解碼方式spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?consumer
#=============== consumer =======================# 指定默認(rèn)消費(fèi)者group id --> 由于在kafka中,同一組中的consumer不會(huì)讀取到同一個(gè)消息,依靠groud.id設(shè)置組名spring.kafka.consumer.group-id=testGroup# smallest和largest才有效,如果smallest重新0開始讀取,如果是largest從logfile的offset讀取。一般情況下我們都是設(shè)置smallestspring.kafka.consumer.auto-offset-reset=earliest# enable.auto.commit:true --> 設(shè)置自動(dòng)提交offsetspring.kafka.consumer.enable-auto-commit=true#如果'enable.auto.commit'為true,則消費(fèi)者偏移自動(dòng)提交給Kafka的頻率(以毫秒為單位),默認(rèn)值為5000。spring.kafka.consumer.auto-commit-interval=100# 指定消息key和消息體的編解碼方式spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer3.Controller
import lombok.AllArgsConstructor;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController@AllArgsConstructor@RequestMapping("Kafka")public class SimpleController { private final KafkaTemplate<Object,Object> kafkaTemplate; @GetMapping("/send/{message}") public String send(@PathVariable("message") String message){ kafkaTemplate.send("topic1","topic1:"+message); kafkaTemplate.send("topic2","topic2:"+message); return message; }}4.Listener
import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Componentpublic class SimpleListener { @KafkaListener(topics = {"topic1","topic2"}) public void listen1(String data){ System.out.println(data); }}5.訪問
地址欄 輸入:http://localhost:9192/Kafka/send/helloKafaka (9192端口是我的boot項(xiàng)目的端口,看官根據(jù)自己的項(xiàng)目更改)
6.結(jié)果
控制臺(tái)輸出:
四,搭建的坑
我搭建使用的是虛擬機(jī)上的linux所以要么開放kafka端口要么關(guān)閉防火墻如果不開放項(xiàng)目啟動(dòng)的時(shí)候報(bào)錯(cuò):Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
點(diǎn)擊解決方案(找了好久還是大佬厲害)
解決方案
總結(jié)
??????????以上就是今天要展示的kafka入門,雖然內(nèi)容不多,但是入門了對(duì)吧,更深層次的原理及其使用場(chǎng)景(大數(shù)據(jù)等等)還等著我們?nèi)グl(fā)掘呢,給我的感覺就是基本的會(huì)了,但是牛批的場(chǎng)景使用還是不會(huì),就像高數(shù)一樣同樣學(xué)的高數(shù)課本但是有的題你不會(huì)人家會(huì),而且還能會(huì)出花樣來,所以多接觸優(yōu)秀的事物,多學(xué)習(xí),多總結(jié)成就優(yōu)秀的你,加油,陌生人,越努力越幸運(yùn)!
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的kafka原理_kafka入门(原理搭建简单使用)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 对于频繁的写数据处理方式
- 下一篇: git 常用命令笔记