日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

30.kafka数据同步Elasticsearch深入详解(ES与Kafka同步)

發布時間:2024/1/8 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 30.kafka数据同步Elasticsearch深入详解(ES与Kafka同步) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1、kafka同步到Elasticsearch方式?

目前已知常用的方式有四種:?
1)logstash_input_kafka插件;?
缺點:不穩定(ES中文社區討論)?
2)spark stream同步;?
缺點:太龐大?
3)kafka connector同步;?
4)自寫程序讀取、解析、寫入?
?
本文主要基于kafka connector實現kafka到Elasticsearch全量、增量同步。

2、從confluenct說起

LinkedIn有個三人小組出來創業了—正是當時開發出Apache Kafka實時信息列隊技術的團隊成員,基于這項技術Jay Kreps帶頭創立了新公司Confluent。Confluent的產品圍繞著Kafka做的。?
Confluent Platform簡化了連接數據源到Kafka,用Kafka構建應用程序,以及安全,監控和管理您的Kafka的基礎設施。?
confluent組成如下所示:?

1)Apache Kafka 消息分發組件,數據采集后先入Kafka。 2)Schema Registry Schema管理服務,消息出入kafka、入hdfs時,給數據做序列化/反序列化處理。 3)Kafka Connect 提供kafka到其他存儲的管道服務,此次焦點是從kafka到hdfs,并建立相關HIVE表。 4)Kafka Rest Proxy 提供kafka的Rest API服務。 5)Kafka Clients 提供Client編程所需SDK。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

默認端口對應表:

組件 | 端口

Apache Kafka brokers (plain text):9092

Confluent Control Center:9021

Kafka Connect REST API:8083

REST Proxy:8082

Schema Registry REST API:8081

ZooKeeper:2181

3、kafka connector介紹。

Kafka 0.9+增加了一個新的特性 Kafka Connect,可以更方便的創建和管理數據流管道。它為Kafka和其它系統創建規模可擴展的、可信賴的流數據提供了一個簡單的模型。

通過 connectors可以將大數據從其它系統導入到Kafka中,也可以從Kafka中導出到其它系統。

Kafka Connect可以將完整的數據庫注入到Kafka的Topic中,或者將服務器的系統監控指標注入到Kafka,然后像正常的Kafka流處理機制一樣進行數據流處理。

而導出工作則是將數據從Kafka Topic中導出到其它數據存儲系統、查詢系統或者離線分析系統等,比如數據庫、 Elastic Search、 Apache Ignite等。

KafkaConnect有兩個核心概念:Source和Sink。 Source負責導入數據到Kafka,Sink負責從Kafka導出數據,它們都被稱為Connector。

kafkaConnect通過Jest實現Kafka對接Elasticsearch。

4、kafka connector安裝

實操非研究性的目的,不建議源碼安裝。?
直接從官網down confluent安裝即可。地址:https://www.confluent.io/download/

如下,解壓后既可以使用。

[root@kafka_no1 confluent-3.3.0]# pwd /home/confluent/confluent-3.3.0[root@kafka_no1 confluent-3.3.0]# ls -al total 32 drwxrwxr-x. 7 root root 4096 Dec 16 10:08 . drwxr-xr-x. 3 root root 4096 Dec 20 15:34 .. drwxr-xr-x. 3 root root 4096 Jul 28 08:30 bin drwxr-xr-x. 18 root root 4096 Jul 28 08:30 etc drwxr-xr-x. 2 root root 4096 Dec 21 15:34 logs -rw-rw-r--. 1 root root 871 Jul 28 08:45 README drwxr-xr-x. 10 root root 4096 Jul 28 08:30 share drwxrwxr-x. 2 root root 4096 Jul 28 08:45 src
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

5、kafka connector模式

Kafka connect 有兩種工作模式?
1)standalone:在standalone模式中,所有的worker都在一個獨立的進程中完成。

2)distributed:distributed模式具有高擴展性,以及提供自動容錯機制。你可以使用一個group.ip來啟動很多worker進程,在有效的worker進程中它們會自動的去協調執行connector和task,如果你新加了一個worker或者掛了一個worker,其他的worker會檢測到然后在重新分配connector和task。

6、kafka connector同步步驟

前提:

$ confluent start
  • 1

如下的服務都需要啟動:

Starting zookeeper zookeeper is [UP] ——對應端口:2181 Starting kafka kafka is [UP]——對應端口:9092 Starting schema-registry schema-registry is [UP]——對應端口:8081 Starting kafka-rest kafka-rest is [UP] Starting connect connect is [UP]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

可以,netstat -natpl 查看端口是否監聽ok。

步驟1:創建topic

./kafka-topics.sh --create --zookeeper 110.118.7.11 :2181 --replication-factor 3 --partitions 1 --topic test-elasticsearch-sink
  • 1

步驟2:生產者發布消息

假定avrotest topic已經創建。

./kafka-avro-console-producer --broker-list 110.118.7.11:9092 --topic test-elasticsearch-sink \--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'{"f1": "value1"} {"f1": "value2"} {"f1": "value3"}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

步驟3:消費者訂閱消息測試(驗證生產者消息可以接收到)

./kafka-avro-console-consumer --bootstrap-server 110.118.7.11:9092 :9092 --topic test-elasticsearch-sink --from-beginning
  • 1

步驟4:connector傳輸數據操作到ES

./connect-standalone ../etc/schema-registry/connect-avro-standalone.properties \ ../etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
  • 1
  • 2

注意此處: connect-standalone模式,對應 connect-avro-standalone.properties要修改;?
如果使用connect-distribute模式,對應的connect-avro-distribute.properties要修改。?
這里 quickstart-elasticsearch.properties :啟動到目的Elasticsearch配置。

quickstart-elasticsearch.properties**設置**:

name=elasticsearch-sink connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector tasks.max=1 #kafka主題名稱,也是對應Elasticsearch索引名稱 topics= test-elasticsearch-sinkkey.ignore=true #ES url信息 connection.url=http://110.18.6.20:9200 #ES type.name固定 type.name=kafka-connect
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

7、同步效果。

curl -XGET 'http:// 110.18.6.20 :9200/test-elasticsearch-sink/_search?pretty'

8、連接信息查詢REST API

  • -
GET /connectors – 返回所有正在運行的connector名。 - POST /connectors – 新建一個connector; 請求體必須是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必須包含你的connector的配置信息。 - GET /connectors/{name} – 獲取指定connetor的信息。 - GET /connectors/{name}/config – 獲取指定connector的配置信息。 - PUT /connectors/{name}/config – 更新指定connector的配置信息。 - GET /connectors/{name}/status – 獲取指定connector的狀態,包括它是否在運行、停止、或者失敗,如果發生錯誤,還會列出錯誤的具體信息。 - GET /connectors/{name}/tasks – 獲取指定connector正在運行的task。 - GET /connectors/{name}/tasks/{taskid}/status – 獲取指定connector的task的狀態信息。 - PUT /connectors/{name}/pause – 暫停connector和它的task,停止數據處理知道它被恢復。 - PUT /connectors/{name}/resume – 恢復一個被暫停的connector。 - POST /connectors/{name}/restart – 重啟一個connector,尤其是在一個connector運行失敗的情況下比較常用 - POST /connectors/{name}/tasks/{taskId}/restart – 重啟一個task,一般是因為它運行失敗才這樣做。 - DELETE /connectors/{name} – 刪除一個connector,停止它的所有task并刪除配置。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

9、小結。

他山之石,可以攻玉。?
kafka上的小學生,繼續加油!

參考:

[1]kafka-connect部署及簡介:http://t.cn/RiUCaWx?
[2]connector介紹:http://orchome.com/344?
[3]英文-同步介紹http://t.cn/RYeZm7P?
[4]部署&開發http://t.cn/RTeyOEl?
[5]confluent生態鏈http://t.cn/RTebVyL?
[6]快速啟動參考:https://docs.confluent.io/3.3.0/quickstart.html?
[7]ES-connector:http://t.cn/RTecXmc

總結

以上是生活随笔為你收集整理的30.kafka数据同步Elasticsearch深入详解(ES与Kafka同步)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。