日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ClickHouse表引擎之Integration系列

發(fā)布時(shí)間:2024/7/5 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ClickHouse表引擎之Integration系列 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

? Integration系統(tǒng)表引擎主要用于將外部數(shù)據(jù)導(dǎo)入到ClickHouse中,或者在ClickHouse中直接操作外部數(shù)據(jù)源。

1 Kafka

1.1 Kafka引擎

? 將Kafka Topic中的數(shù)據(jù)直接導(dǎo)入到ClickHouse。

? 語法如下:

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] (name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],... ) ENGINE = Kafka() SETTINGSkafka_broker_list = 'host:port',kafka_topic_list = 'topic1,topic2,...',kafka_group_name = 'group_name',kafka_format = 'data_format'[,][kafka_row_delimiter = 'delimiter_symbol',][kafka_schema = '',][kafka_num_consumers = N,][kafka_max_block_size = 0,][kafka_skip_broken_messages = N,][kafka_commit_every_batch = 0]

? 參數(shù)說明:

? ①必需的參數(shù)

參數(shù)說明
kafka_broker_listKafka broker列表,以逗號分隔
kafka_topic_listKafka topic列表
kafka_group_nameKafka消費(fèi)者組,如果不希望消息在集群中重復(fù),使用相同的組名
kafka_format消息格式。使用與SQL格式函數(shù)相同的符號,例如JSONEachRow

? ②可選參數(shù)

參數(shù)說明
kafka_row_delimiter分隔符字符,用于一行的結(jié)束標(biāo)識符號
kafka_schema如果kafka_format參數(shù)需要schema定義,則通過該參數(shù)來支持
kafka_num_consumers每張表的消費(fèi)者個(gè)數(shù)。默認(rèn)值:1。如果一個(gè)使用者的吞吐量不足,則指定更多使用者。使用者的總數(shù)不應(yīng)該超過主題中的分區(qū)數(shù),因?yàn)槊總€(gè)分區(qū)只能分配一個(gè)使用者。
kafka_max_block_size輪詢的最大批處理大小
kafka_skip_broken_messages忽略無效記錄的條數(shù)。默認(rèn)值:0
kafka_commit_every_batch在編寫整個(gè)塊之后提交每個(gè)使用和處理的批而不是單個(gè)提交(默認(rèn)值:0)

? 測試:(1)建表

CREATE TABLE test_kafka (\timestamp UInt64,\level String,\message String\) ENGINE = Kafka() SETTINGS kafka_broker_list = 'ambari01:6667,ambari02:6667,ambari03:6667',\kafka_topic_list = 'test',\kafka_group_name = 'group1',\kafka_format = 'JSONEachRow',\kafka_row_delimiter = '\n'

? 注意:如果后面在查詢過程中報(bào)如下錯(cuò)誤。是因?yàn)橛行┮姘姹敬嬖诘?#xff0c;消息中數(shù)據(jù)之間的分割符號未指定,導(dǎo)致無法處理。解決辦法: 添加 kafka_row_delimiter = ‘\n’。

Cannot parse input: expected { before: \0: (at row 2)

? (2)在kafka建立一個(gè)新的topic

sh kafka-topics.sh --create --zookeeper ambari01:2181,ambari02:2181,ambari03:2181 --replication-factor 1 --partitions 3 --topic test

? (3)在kafka建立發(fā)布者console-producer

sh kafka-console-producer.sh --broker-list ambari01:6667,ambari02:6667,ambari03:6667 --topic test

? (4)發(fā)送數(shù)據(jù)

{"timestamp":1515897460,"level":"one","message":"aa"}

? 注意:由于一個(gè)kafka的partition 只能由一個(gè) group consumer 消費(fèi),所以clickhouse 節(jié)點(diǎn)數(shù)需要大于 topic 的 partition 數(shù)。

? (5)第一次查詢

SELECT * FROM test_kafka ┌──timestamp─┬─level─┬─message─┐ │ 1515897460 │ one │ aa │ └────────────┴───────┴─────────┘

? (6)第二次查詢

SELECT * FROM test_kafka Ok.

? 發(fā)現(xiàn)第二次查詢的時(shí)候沒有數(shù)據(jù)了,因?yàn)?Kafka引擎 表只是 kafka 流的一個(gè)視圖而已,當(dāng)數(shù)據(jù)被 select 了一次之后,這個(gè)數(shù)據(jù)就會(huì)被認(rèn)為已經(jīng)消費(fèi)了,下次 select 就不會(huì)再出現(xiàn)。所以Kafka表單獨(dú)使用是沒什么用的,一般是用來和 MaterialView 配合,將Kafka表里面的數(shù)據(jù)自動(dòng)導(dǎo)入到 MaterialView 里面。

? (7)與 MaterialView 集成

? 我們現(xiàn)在每一節(jié)點(diǎn)建一個(gè) MaterialView 保存 Kafka 里面的數(shù)據(jù), 再建一個(gè)全局的Distributed表。

CREATE MATERIALIZED VIEW test_kafka_view ENGINE = SummingMergeTree() PARTITION BY day ORDER BY (day, level) AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as count FROM test_kafka GROUP BY day, level;

? (6)再次發(fā)送數(shù)據(jù)

{"timestamp":1515897461,"level":"2","message":'bb'} {"timestamp":1515897462,"level":"3","message":'cc'} {"timestamp":1515897462,"level":"3","message":'ee'} {"timestamp":1515897463,"level":"4","message":'dd'}

? (7)查詢數(shù)據(jù)

SELECT * FROM test_kafka Ok.0 rows in set. Elapsed: 2.686 sec. --------------------------------------- SELECT * FROM test_kafka_view Ok.0 rows in set. Elapsed: 0.002 sec.

? 發(fā)現(xiàn)沒有數(shù)據(jù),原因:kafka 引擎默認(rèn)消費(fèi)根據(jù)條數(shù)與時(shí)間進(jìn)行入庫,不然肯定是沒效率的。其中對應(yīng)的參數(shù)有兩個(gè)。 max_insert_block_size(默認(rèn)值為: 1048576),stream_flush_interval_ms(默認(rèn)值為: 7500)這兩個(gè)參數(shù)都是全局性的。

? 業(yè)務(wù)系統(tǒng)需要從kafka讀取數(shù)據(jù),按照官方文檔建好表后,也能看到數(shù)據(jù),但是延時(shí)很高。基本要延時(shí)15分鐘左右。kafka的數(shù)據(jù)大約每秒50條左右。基本規(guī)律是累計(jì)到65535行以后(最小的塊大小)才會(huì)在表中顯示數(shù)據(jù)。嘗試更改stream_flush_interval_ms 沒有作用,但是有不想改max_block_size,因?yàn)樾薷囊院笥绊懙饺炙斜?#xff0c;并且影響搜索效率。希望能每N秒保證不管block有沒有寫滿都flush一次。

? 雖然ClickHouse和 Kafka的配合可以說是十分的便利,只有配置好,但是相當(dāng)?shù)木窒扌詫?kafka 數(shù)據(jù)格式的支持也有限。下面介紹WaterDrop這個(gè)中間件將Kafka的數(shù)據(jù)接入ClickHouse。

?

1.2 WaterDrop

? WaterDrop: 是一個(gè)非常易用,高性能、支持實(shí)時(shí)流式和離線批處理的海量數(shù)據(jù)處理產(chǎn)品,架構(gòu)于Apache Spark和 Apache Flink之上。github地址:https://github.com/InterestingLab/waterdrop

? ①下載并解壓

wget https://github.com/InterestingLab/waterdrop/releases/download/v1.4.3/waterdrop-1.4.3.zip unzip waterdrop-1.4.3.zip

? ②修改配置文件waterdrop-env.sh

vim /opt/module/waterdrop-1.4.3/config/waterdrop-env.sh SPARK_HOME=/usr/jdp/3.2.0.0-108/spark2 #配置為spark的路徑

? ③增加配置文件test.conf

spark {spark.streaming.batchDuration = 5spark.app.name = "test_waterdrop"spark.ui.port = 14020spark.executor.instances = 3spark.executor.cores = 1spark.executor.memory = "1g" }input {kafkaStream {topics = "test_wd"consumer.bootstrap.servers = "10.0.0.50:6667,10.0.0.52:6667,10.0.0.53:6667"consumer.zookeeper.connect = "10.0.0.50:2181,10.0.0.52:2181,10.0.0.53:2181"consumer.group.id = "group1"consumer.failOnDataLoss = falseconsumer.auto.offset.reset = latestconsumer.rebalance.max.retries = 100} } filter {json{source_field = "raw_message"} }output {clickhouse {host = "10.0.0.50:8123"database = "test"table = "test_wd"fields = ["act","b_t","s_t"]username = "admin"password = "admin"retry_codes = [209, 210 ,1002]retry = 10bulk_size = 1000} }

? ④創(chuàng)建Clickhouse表

create table test.test_wd( act String, b_t String, s_t Date) ENGINE = MergeTree() partition by s_t order by s_t;

? ⑤啟動(dòng)寫入程序

cd /data/work/waterdrop-1.4.1 sh /opt/module/waterdrop-1.4.3/bin/start-waterdrop.sh --master yarn --deploy-mode client --config /opt/module/waterdrop-1.4.3/config/test.conf

? ⑥插入數(shù)據(jù)

{"act":"aaaa","b_t":"100","s_t":"2019-12-22"} {"act":"bxc","b_t":"200","s_t":"2020-01-01"} {"act":"dd","b_t":"50","s_t":"2020-02-01"}

? ⑦查看表數(shù)據(jù)

SELECT * FROM test_wd ┌─act─┬─b_t─┬────────s_t─┐ │ dd │ 50 │ 2020-02-01 │ └─────┴─────┴────────────┘ ┌─act──┬─b_t─┬────────s_t─┐ │ aaaa │ 100 │ 2019-12-22 │ └──────┴─────┴────────────┘ ┌─act─┬─b_t─┬────────s_t─┐ │ bxc │ 200 │ 2020-01-01 │ └─────┴─────┴────────────┘

2 MySQL

? 將Mysql作為存儲(chǔ)引擎,可以對存儲(chǔ)在遠(yuǎn)程 MySQL 服務(wù)器上的數(shù)據(jù)執(zhí)行 select查詢

? 語法:

MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);

? 參數(shù)說明

參數(shù)說明
host:portMySQL 服務(wù)器地址
database數(shù)據(jù)庫的名稱
table表名稱
user數(shù)據(jù)庫用戶
password用戶密碼
replace_query將 INSERT INTO 查詢是否替換為 REPLACE INTO 的標(biāo)志。如果 replace_query=1,則替換查詢
on_duplicate_clause將 ON DUPLICATE KEY UPDATE on_duplicate_clause 表達(dá)式添加到 INSERT 查詢語句中。

? 測試:

? 在Mysql中建表,并插入數(shù)據(jù)

CREATE TABLE `user` (`id` int(11) NOT NULL,`username` varchar(50) DEFAULT NULL,`sex` varchar(5) DEFAULT NULL )INSERT INTO user values(11,"zs","0"); INSERT INTO user values(12,"ls","0"); INSERT INTO user values(13,"ww","0"); INSERT INTO user values(14,"ll","1");

? 創(chuàng)建ClickHouse表,insert_time字段為默認(rèn)字段

CREATE TABLE test.from_mysql(\id UInt64,\username String,\sex String,\insert_time Date DEFAULT toDate(now())\ ) ENGINE = MergeTree()\ PARTITION BY insert_time \ ORDER BY (id,username)

? 插入數(shù)據(jù)

INSERT INTO test.from_mysql (id,username,sex) SELECT id, username,sex FROM mysql('10.0.0.42:3306','test', 'user', 'root', 'admin');

? 查詢數(shù)據(jù)

SELECT * FROM from_mysql ┌─id─┬─username─┬─sex─┬─insert_time─┐ │ 11 │ zs │ 0 │ 2020-05-24 │ │ 12 │ ls │ 0 │ 2020-05-24 │ │ 13 │ ww │ 0 │ 2020-05-24 │ │ 14 │ ll │ 1 │ 2020-05-24 │ └────┴──────────┴─────┴─────────────┘4 rows in set. Elapsed: 0.003 sec.

3 HDFS

? 用戶通過執(zhí)行SQL語句,可以在ClickHouse中直接讀取HDFS的文件,也可以將讀取的數(shù)據(jù)導(dǎo)入到ClickHouse本地表。

? HDFS引擎:ENGINE = HDFS(URI, format)。URI:HDFS的URI,format:存儲(chǔ)格式,格式鏈接https://clickhouse.tech/docs/en/interfaces/formats/#formats

3.1 查詢文件

? 這種使用場景相當(dāng)于把HDFS做為ClickHouse的外部存儲(chǔ),當(dāng)查詢數(shù)據(jù)時(shí),直接訪問HDFS的文件,而不是把HDFS文件導(dǎo)入到ClickHouse再進(jìn)行查詢。相對于ClickHouse的本地存儲(chǔ)查詢,速度較慢。

? 在HDFS上新建一個(gè)數(shù)據(jù)文件:user.csv,上傳hadoop fs -cat /user/test/user.csv,內(nèi)容如下:

1,zs,18 2,ls,19 4,wu,25 3,zl,22

? 在ClickHouse上創(chuàng)建一個(gè)訪問user.csv文件的表:

CREATE TABLE test_hdfs_csv(\id UInt64,\name String,\age UInt8\ )ENGINE = HDFS('hdfs://ambari01:8020/user/test/user.csv', 'CSV')

? 查詢hdfs_books_csv表

SELECT * FROM test_hdfs_csv ┌─id─┬─name─┬─age─┐ │ 1 │ zs │ 18 │ │ 2 │ ls │ 19 │ │ 4 │ wu │ 25 │ │ 3 │ zl │ 22 │ └────┴──────┴─────┘

3.2 從HDFS導(dǎo)入數(shù)據(jù)

? 從HDFS導(dǎo)入數(shù)據(jù),數(shù)據(jù)在ClickHouse本地表,建本地表

CREATE TABLE test_hdfs_local(\id UInt64,\name String,\age UInt8\ )ENGINE = Log

? 在數(shù)據(jù)存儲(chǔ)目錄下可以找到這個(gè)表的文件夾

/data/clickhouse/data/test/test_hdfs_local

? 從HDFS導(dǎo)入數(shù)據(jù)

INSERT INTO test_hdfs_local SELECT * FROM test_hdfs_csv

? 查詢

SELECT * FROM test_hdfs_local ┌─id─┬─name─┬─age─┐ │ 1 │ zs │ 18 │ │ 2 │ ls │ 19 │ │ 4 │ wu │ 25 │ │ 3 │ zl │ 22 │ └────┴──────┴─────┘

總結(jié)

以上是生活随笔為你收集整理的ClickHouse表引擎之Integration系列的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。