日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka原理_kafka入门(原理搭建简单使用)

發(fā)布時(shí)間:2023/12/19 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka原理_kafka入门(原理搭建简单使用) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

前言

???????公司在用kafka接受和發(fā)送數(shù)據(jù),自己學(xué)習(xí)過Rabbitmq,不懂kafka挺不爽的,說干就干!網(wǎng)上找了許多帖子,學(xué)習(xí)了很多,小小的demo自己也搭建起來了,美滋滋,下面我認(rèn)為優(yōu)秀的網(wǎng)站和自己的步驟展現(xiàn)給大家。

一、kafka介紹與原理

我們將消息的發(fā)布(publish)稱作 producer,將消息的訂閱(subscribe)表述為 consumer,將中間的存儲(chǔ)陣列稱作 broker(代理),這樣就可以大致描繪出這樣一個(gè)場(chǎng)面:

生產(chǎn)者將數(shù)據(jù)生產(chǎn)出來,交給 broker 進(jìn)行存儲(chǔ),消費(fèi)者需要消費(fèi)數(shù)據(jù)了,就從broker中去拿出數(shù)據(jù)來,然后完成一系列對(duì)數(shù)據(jù)的處理操作。

乍一看返也太簡(jiǎn)單了,不是說了它是分布式嗎,難道把 producer、 broker 和 consumer 放在三臺(tái)不同的機(jī)器上就算是分布式了嗎。看 kafka 官方給出的圖:

多個(gè) broker 協(xié)同合作,producer 和 consumer 部署在各個(gè)業(yè)務(wù)邏輯中被頻繁的調(diào)用,三者通過 zookeeper管理協(xié)調(diào)請(qǐng)求和轉(zhuǎn)發(fā)。這樣一個(gè)高性能的分布式消息發(fā)布訂閱系統(tǒng)就完成了。

圖上有個(gè)細(xì)節(jié)需要注意,producer 到 broker 的過程是 push,也就是有數(shù)據(jù)就推送到 broker,而 consumer 到 broker 的過程是 pull,是通過 consumer 主動(dòng)去拉數(shù)據(jù)的,而不是 broker 把數(shù)據(jù)主懂發(fā)送到 consumer 端的。

二、kafka的linux基本搭建

??????此處還是給大家一個(gè)鏈接很是優(yōu)秀!kafka的linux搭建
注意:kafka依賴于zookeeper的節(jié)點(diǎn),需要搭建zookeeper,linux安裝zookeeper文章中有鏈接(kafaka高版本也自帶zookeeper,其實(shí)博主用kafka自帶的zookeeper啟動(dòng),然后啟動(dòng)kafka沒成功,用安裝的zookeeper就成功了…)

三、springboot整合kafka的簡(jiǎn)單demo

1.引入依賴

在springboot項(xiàng)目中的pom.xml引入下列依賴:

<dependency> <groupId>org.springframework.kafkagroupId> <artifactId>spring-kafkaartifactId> <version>2.2.6.RELEASEversion> dependency> <dependency> <groupId>org.apache.kafkagroupId> <artifactId>kafka-clientsartifactId> <version>2.1.0version> dependency> <dependency> <groupId>org.projectlombokgroupId> <artifactId>lombokartifactId> <optional>trueoptional> dependency>

2.yml配置文件

配置如下:

spring: kafka: bootstrap-servers: 192.168.200.130:9092 #此處是我虛擬機(jī)上linux的ip kafak的默認(rèn)端口為9092 producer: #生產(chǎn)者 acks: 1 client-id: kafka-producer batch-size: 5 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: #消費(fèi)者 group-id: hello-group enable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer application: name: kafkatestserver: port: 9192eureka: #eureka注冊(cè)中心配置 client: service-url: defaultZone: http://127.0.0.1:6868/eureka instance: prefer-ip-address: true

?是不是依賴沒注釋看不懂ennnnn,別急嘛,那我來搬運(yùn)點(diǎn)詳細(xì)的注釋解釋(我的demo就是上面的)

???????????????????????????????????????????????producer

#============== kafka ===================# 指定kafka server的地址,集群配多個(gè),中間,逗號(hào)隔開spring.kafka.bootstrap-servers=127.0.0.1:9092#=============== provider =======================# 寫入失敗時(shí),重試次數(shù)。當(dāng)leader節(jié)點(diǎn)失效,一個(gè)repli節(jié)點(diǎn)會(huì)替代成為leader節(jié)點(diǎn),此時(shí)可能出現(xiàn)寫入失敗,# 當(dāng)retris為0時(shí),produce不會(huì)重復(fù)。retirs重發(fā),此時(shí)repli節(jié)點(diǎn)完全成為leader節(jié)點(diǎn),不會(huì)產(chǎn)生消息丟失。spring.kafka.producer.retries=0# 每次批量發(fā)送消息的數(shù)量,produce積累到一定數(shù)據(jù),一次發(fā)送spring.kafka.producer.batch-size=16384# produce積累數(shù)據(jù)一次發(fā)送,緩存大小達(dá)到buffer.memory就發(fā)送數(shù)據(jù)spring.kafka.producer.buffer-memory=33554432#procedure要求leader在考慮完成請(qǐng)求之前收到的確認(rèn)數(shù),用于控制發(fā)送記錄在服務(wù)端的持久化,其值可以為如下:#acks = 0 如果設(shè)置為零,則生產(chǎn)者將不會(huì)等待來自服務(wù)器的任何確認(rèn),該記錄將立即添加到套接字緩沖區(qū)并視為已發(fā)送。在這種情況下,無法保證服務(wù)器已收到記錄,并且重試配置將不會(huì)生效(因?yàn)榭蛻舳送ǔ2粫?huì)知道任何故障),為每條記錄返回的偏移量始終設(shè)置為-1。#acks = 1 這意味著leader會(huì)將記錄寫入其本地日志,但無需等待所有副本服務(wù)器的完全確認(rèn)即可做出回應(yīng),在這種情況下,如果leader在確認(rèn)記錄后立即失敗,但在將數(shù)據(jù)復(fù)制到所有的副本服務(wù)器之前,則記錄將會(huì)丟失。#acks = all 這意味著leader將等待完整的同步副本集以確認(rèn)記錄,這保證了只要至少一個(gè)同步副本服務(wù)器仍然存活,記錄就不會(huì)丟失,這是最強(qiáng)有力的保證,這相當(dāng)于acks = -1的設(shè)置。#可以設(shè)置的值為:all, -1, 0, 1spring.kafka.producer.acks=1# 指定消息key和消息體的編解碼方式spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?consumer

#=============== consumer =======================# 指定默認(rèn)消費(fèi)者group id --> 由于在kafka中,同一組中的consumer不會(huì)讀取到同一個(gè)消息,依靠groud.id設(shè)置組名spring.kafka.consumer.group-id=testGroup# smallest和largest才有效,如果smallest重新0開始讀取,如果是largest從logfile的offset讀取。一般情況下我們都是設(shè)置smallestspring.kafka.consumer.auto-offset-reset=earliest# enable.auto.commit:true --> 設(shè)置自動(dòng)提交offsetspring.kafka.consumer.enable-auto-commit=true#如果'enable.auto.commit'為true,則消費(fèi)者偏移自動(dòng)提交給Kafka的頻率(以毫秒為單位),默認(rèn)值為5000。spring.kafka.consumer.auto-commit-interval=100# 指定消息key和消息體的編解碼方式spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3.Controller

import lombok.AllArgsConstructor;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController@AllArgsConstructor@RequestMapping("Kafka")public class SimpleController { private final KafkaTemplate<Object,Object> kafkaTemplate; @GetMapping("/send/{message}") public String send(@PathVariable("message") String message){ kafkaTemplate.send("topic1","topic1:"+message); kafkaTemplate.send("topic2","topic2:"+message); return message; }}

4.Listener

import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Componentpublic class SimpleListener { @KafkaListener(topics = {"topic1","topic2"}) public void listen1(String data){ System.out.println(data); }}

5.訪問

地址欄 輸入:http://localhost:9192/Kafka/send/helloKafaka (9192端口是我的boot項(xiàng)目的端口,看官根據(jù)自己的項(xiàng)目更改)

6.結(jié)果

控制臺(tái)輸出:

四,搭建的坑

我搭建使用的是虛擬機(jī)上的linux所以要么開放kafka端口要么關(guān)閉防火墻如果不開放項(xiàng)目啟動(dòng)的時(shí)候報(bào)錯(cuò):Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
點(diǎn)擊解決方案(找了好久還是大佬厲害)
解決方案

總結(jié)

??????????以上就是今天要展示的kafka入門,雖然內(nèi)容不多,但是入門了對(duì)吧,更深層次的原理及其使用場(chǎng)景(大數(shù)據(jù)等等)還等著我們?nèi)グl(fā)掘呢,給我的感覺就是基本的會(huì)了,但是牛批的場(chǎng)景使用還是不會(huì),就像高數(shù)一樣同樣學(xué)的高數(shù)課本但是有的題你不會(huì)人家會(huì),而且還能會(huì)出花樣來,所以多接觸優(yōu)秀的事物,多學(xué)習(xí),多總結(jié)成就優(yōu)秀的你,加油,陌生人,越努力越幸運(yùn)!

創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)

總結(jié)

以上是生活随笔為你收集整理的kafka原理_kafka入门(原理搭建简单使用)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。