日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

ClickHouse(21)ClickHouse集成Kafka表引擎详细解析

發布時間:2024/1/16 windows 37 coder
生活随笔 收集整理的這篇文章主要介紹了 ClickHouse(21)ClickHouse集成Kafka表引擎详细解析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄
  • Kafka表集成引擎
    • 配置
      • Kerberos 支持
    • 虛擬列
  • 資料分享
  • 參考文章

Kafka表集成引擎

此引擎與Apache Kafka結合使用。

Kafka 特性:

  • 發布或者訂閱數據流。
  • 容錯存儲機制。
  • 處理流數據。

老版Kafka集成表引擎參數格式:

Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
      [, kafka_row_delimiter, kafka_schema, kafka_num_consumers])

新版Kafka集成表引擎參數格式:

Kafka SETTINGS
  kafka_broker_list = 'localhost:9092',
  kafka_topic_list = 'topic1,topic2',
  kafka_group_name = 'group1',
  kafka_format = 'JSONEachRow',
  kafka_row_delimiter = '\n',
  kafka_schema = '',
  kafka_num_consumers = 2

必要參數:

  • kafka_broker_list – 以逗號分隔的 brokers 列表 (localhost:9092)。
  • kafka_topic_list – topic 列表 (my_topic)。
  • kafka_group_name – Kafka 消費組名稱 (group1)。如果不希望消息在集群中重復,請在每個分片中使用相同的組名。
  • kafka_format – 消息體格式。使用與 SQL 部分的 FORMAT 函數相同表示方法,例如 JSONEachRow

可選參數:

  • kafka_row_delimiter - 每個消息體(記錄)之間的分隔符。
  • kafka_schema – 如果解析格式需要一個 schema 時,此參數必填。
  • kafka_num_consumers – 單個表的消費者數量。默認值是:1,如果一個消費者的吞吐量不足,則指定更多的消費者。消費者的總數不應該超過 topic 中分區的數量,因為每個分區只能分配一個消費者。

ClickHouse可以接受和返回各種格式的數據。受支持的輸入格式可用于提交給INSERT語句、從文件表(File,URL,HDFS或者外部目錄)執行SELECT語句,受支持的輸出格式可用于格式化SELECT語句的返回結果,或者通過INSERT寫入到文件表。

以下kafka_format是支持的格式,ClickHouse可以接受和返回各種格式的數據。受支持的輸入格式可用于提交給INSERT語句、從文件表(File,URL,HDFS或者外部目錄)執行SELECT語句,受支持的輸出格式可用于格式化SELECT語句的返回結果,或者通過INSERT寫入到文件表。

格式 輸入 輸出
[TabSeparated] ? ?
[TabSeparatedRaw] ? ?
[TabSeparatedWithNames] ? ?
[TabSeparatedWithNamesAndTypes] ? ?
[Template] ? ?
[TemplateIgnoreSpaces] ? ?
[CSV] ? ?
[CSVWithNames] ? ?
[CustomSeparated] ? ?
[Values] ? ?
[Vertical] ? ?
[JSON] ? ?
[JSONAsString] ? ?
[JSONStrings] ? ?
[JSONCompact] ? ?
[JSONCompactStrings] ? ?
[JSONEachRow] ? ?
[JSONEachRowWithProgress] ? ?
[JSONStringsEachRow] ? ?
[JSONStringsEachRowWithProgress] ? ?
[JSONCompactEachRow] ? ?
[JSONCompactEachRowWithNamesAndTypes] ? ?
[JSONCompactStringsEachRow] ? ?
[JSONCompactStringsEachRowWithNamesAndTypes] ? ?
[TSKV] ? ?
[Pretty] ? ?
[PrettyCompact] ? ?
[PrettyCompactMonoBlock] ? ?
[PrettyNoEscapes] ? ?
[PrettySpace] ? ?
[Protobuf] ? ?
[ProtobufSingle] ? ?
[Avro] ? ?
[AvroConfluent] ? ?
[Parquet] ? ?
[Arrow] ? ?
[ArrowStream] ? ?
[ORC] ? ?
[RowBinary] ? ?
[RowBinaryWithNamesAndTypes] ? ?
[Native] ? ?
[Null] ? ?
[XML] ? ?
[CapnProto] ? ?
[LineAsString] ? ?
[Regexp] ? ?
[RawBLOB] ? ?

示例:

  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  SELECT * FROM queue LIMIT 5;

  CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
                            kafka_topic_list = 'topic',
                            kafka_group_name = 'group1',
                            kafka_format = 'JSONEachRow',
                            kafka_num_consumers = 4;

  CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
              SETTINGS kafka_format = 'JSONEachRow',
                       kafka_num_consumers = 4;

消費的消息會被自動追蹤,因此每個消息在不同的消費組里只會記錄一次。如果希望獲得兩次數據,則使用另一個組名創建副本。

消費組可以靈活配置并且在集群之間同步。例如,如果群集中有10個主題和5個表副本,則每個副本將獲得2個主題。 如果副本數量發生變化,主題將自動在副本中重新分配。

SELECT 查詢對于讀取消息并不是很有用(調試除外),因為每條消息只能被讀取一次。使用物化視圖創建實時線程更實用。您可以這樣做:

  1. 使用引擎創建一個 Kafka 消費者并作為一條數據流。
  2. 創建一個結構表。
  3. 創建物化視圖,改視圖會在后臺轉換引擎中的數據并將其放入之前創建的表中。

MATERIALIZED VIEW 添加至引擎,它將會在后臺收集數據。可以持續不斷地從 Kafka 收集數據并通過 SELECT 將數據轉換為所需要的格式。

示例:

  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  CREATE TABLE daily (
    day Date,
    level String,
    total UInt64
  ) ENGINE = SummingMergeTree(day, (day, level), 8192);

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
    FROM queue GROUP BY day, level;

  SELECT level, sum(total) FROM daily GROUP BY level;

為了提高性能,接受的消息被分組為max_insert_block_size大小的塊。如果未在stream_flush_interval_ms毫秒內形成塊,則不關心塊的完整性,都會將數據刷新到表中。

停止接收主題數據或更改轉換邏輯,請 detach 物化視圖:

  DETACH TABLE consumer;
  ATTACH TABLE consumer;

如果使用 ALTER 更改目標表,為了避免目標表與視圖中的數據之間存在差異,推薦停止物化視圖。

配置

GraphiteMergeTree 類似,Kafka 引擎支持使用ClickHouse配置文件進行擴展配置。可以使用兩個配置鍵:全局 (kafka) 和 主題級別 (kafka_*)。首先應用全局配置,然后應用主題級配置(如果存在)。

  <!-- Global configuration options for all tables of Kafka engine type -->
  <kafka>
    <debug>cgrp</debug>
    <auto_offset_reset>smallest</auto_offset_reset>
  </kafka>

  <!-- Configuration specific for topic "logs" -->
  <kafka_logs>
    <retry_backoff_ms>250</retry_backoff_ms>
    <fetch_min_bytes>100000</fetch_min_bytes>
  </kafka_logs>

ClickHouse配置中使用下劃線 (_) ,并不是使用點 (.)。例如,check.crcs=true 將是 <check_crcs>true</check_crcs>

Kerberos 支持

對于使用了kerberos的kafka, 將security_protocol 設置為sasl_plaintext就夠了,如果kerberos的ticket是由操作系統獲取和緩存的。
clickhouse也支持自己使用keyfile的方式來維護kerbros的憑證。配置sasl_kerberos_service_name、sasl_kerberos_keytab、sasl_kerberos_principal三個子元素就可以。

示例:

  <!-- Kerberos-aware Kafka -->
  <kafka>
    <security_protocol>SASL_PLAINTEXT</security_protocol>
    <sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
    <sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
  </kafka>

虛擬列

  • _topic – Kafka 主題。
  • _key – 信息的鍵。
  • _offset – 消息的偏移量。
  • _timestamp – 消息的時間戳。
  • _timestamp_ms – 消息的時間戳(毫秒)。
  • _partition – Kafka 主題的分區。

資料分享

ClickHouse經典中文文檔分享

參考文章

  • ClickHouse(01)什么是ClickHouse,ClickHouse適用于什么場景
  • ClickHouse(02)ClickHouse架構設計介紹概述與ClickHouse數據分片設計
  • ClickHouse(03)ClickHouse怎么安裝和部署
  • ClickHouse(04)如何搭建ClickHouse集群
  • ClickHouse(05)ClickHouse數據類型詳解
  • ClickHouse(06)ClickHouse建表語句DDL詳細解析
  • ClickHouse(07)ClickHouse數據庫引擎解析
  • ClickHouse(08)ClickHouse表引擎概況
  • ClickHouse(09)ClickHouse合并樹MergeTree家族表引擎之MergeTree詳細解析
  • ClickHouse(10)ClickHouse合并樹MergeTree家族表引擎之ReplacingMergeTree詳細解析
  • ClickHouse(11)ClickHouse合并樹MergeTree家族表引擎之SummingMergeTree詳細解析
  • ClickHouse(12)ClickHouse合并樹MergeTree家族表引擎之AggregatingMergeTree詳細解析
  • ClickHouse(13)ClickHouse合并樹MergeTree家族表引擎之CollapsingMergeTree詳細解析
  • ClickHouse(14)ClickHouse合并樹MergeTree家族表引擎之VersionedCollapsingMergeTree詳細解析
  • ClickHouse(15)ClickHouse合并樹MergeTree家族表引擎之GraphiteMergeTree詳細解析
  • ClickHouse(16)ClickHouse日志引擎Log詳細解析
  • ClickHouse(17)ClickHouse集成JDBC表引擎詳細解析
  • ClickHouse(18)ClickHouse集成ODBC表引擎詳細解析
  • ClickHouse(19)ClickHouse集成Hive表引擎詳細解析
  • ClickHouse(20)ClickHouse集成PostgreSQL表引擎詳細解析

總結

以上是生活随笔為你收集整理的ClickHouse(21)ClickHouse集成Kafka表引擎详细解析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。