日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > 数据库 >内容正文

数据库

Flink SQL CDC 13 条生产实践经验

發(fā)布時(shí)間:2024/3/13 数据库 45 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink SQL CDC 13 条生产实践经验 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

摘要:7月,Flink 1.11 新版發(fā)布,在生態(tài)及易用性上有大幅提升,其中 Table & SQL 開(kāi)始支持 Change Data Capture(CDC)。CDC 被廣泛使用在復(fù)制數(shù)據(jù)、更新緩存、微服務(wù)間同步數(shù)據(jù)、審計(jì)日志等場(chǎng)景,本文由社區(qū)由曾慶東同學(xué)分享,主要介紹 Flink SQL CDC 在生產(chǎn)環(huán)境的落地實(shí)踐以及總結(jié)的實(shí)戰(zhàn)經(jīng)驗(yàn),文章分為以下幾部分:

  • 項(xiàng)目背景

  • 解決方案

  • 項(xiàng)目運(yùn)行環(huán)境與現(xiàn)狀

  • 具體實(shí)現(xiàn)

  • 踩過(guò)的坑和學(xué)到的經(jīng)驗(yàn)

  • 總結(jié)

  • Tips:點(diǎn)擊下方鏈接可查看社區(qū)直播的 Flink SQL CDC 相關(guān)視頻~
    https://flink-learning.org.cn/developers/flink-training-course3/

    01 項(xiàng)目背景

    項(xiàng)目屬于公司里面數(shù)據(jù)密集、計(jì)算密集的一個(gè)重要項(xiàng)目,需要提供高效且準(zhǔn)確的 OLAP 服務(wù),提供靈活且實(shí)時(shí)的報(bào)表。業(yè)務(wù)數(shù)據(jù)存儲(chǔ)在 MySQL 中,通過(guò)主從復(fù)制同步到報(bào)表庫(kù)。作為集團(tuán)級(jí)公司,數(shù)據(jù)增長(zhǎng)多而且快,出現(xiàn)了多個(gè)千萬(wàn)級(jí)、億級(jí)的大表。為了實(shí)現(xiàn)各個(gè)維度的各種復(fù)雜的報(bào)表業(yè)務(wù),有些千萬(wàn)級(jí)大表仍然需要進(jìn)行 Join,計(jì)算規(guī)模非常驚人,經(jīng)常不能及時(shí)響應(yīng)請(qǐng)求。

    隨著數(shù)據(jù)量的日益增長(zhǎng)和實(shí)時(shí)分析的需求越來(lái)越大,急需對(duì)系統(tǒng)進(jìn)行流式計(jì)算、實(shí)時(shí)化改造。正是在這個(gè)背景下,開(kāi)始了我們與 Flink SQL CDC 的故事。

    ?

    02 解決方案

    針對(duì)平臺(tái)現(xiàn)在存在的問(wèn)題,我們提出了把報(bào)表的數(shù)據(jù)實(shí)時(shí)化的方案。該方案主要通過(guò) Flink SQL CDC + Elasticsearch 實(shí)現(xiàn)。Flink SQL 支持 CDC 模式的數(shù)據(jù)同步,將 MySQL 中的全增量數(shù)據(jù)實(shí)時(shí)地采集、預(yù)計(jì)算、并同步到 Elasticsearch 中,Elasticsearch 作為我們的實(shí)時(shí)報(bào)表和即席分析引擎。項(xiàng)目整體架構(gòu)圖如下所示:

    ?

    實(shí)時(shí)報(bào)表實(shí)現(xiàn)具體思路是,使用 Flink CDC 讀取全量數(shù)據(jù),全量數(shù)據(jù)同步完成后,Flink CDC 會(huì)無(wú)縫切換至 MySQL 的 binlog 位點(diǎn)繼續(xù)消費(fèi)增量的變更數(shù)據(jù),且保證不會(huì)多消費(fèi)一條也不會(huì)少消費(fèi)一條。讀取到的賬單和訂單的全增量數(shù)據(jù)會(huì)與產(chǎn)品表做關(guān)聯(lián)補(bǔ)全信息,并做一些預(yù)聚合,然后將聚合結(jié)果輸出到 Elasticsearch,前端頁(yè)面只需要到 Elasticsearch 通過(guò)精準(zhǔn)匹配(terms)查找數(shù)據(jù),或者再使用 agg 做高維聚合統(tǒng)計(jì)得到多個(gè)服務(wù)中心的報(bào)表數(shù)據(jù)。

    ?

    從整體架構(gòu)中,可以看到,Flink SQL 及其 CDC 功能在我們的架構(gòu)中扮演著核心角色。我們采用 Flink SQL CDC,而不是 Canal + Kafka 的傳統(tǒng)架構(gòu),主要原因還是因?yàn)槠湟蕾嚱M件少,維護(hù)成本低,開(kāi)箱即用,上手容易。具體來(lái)說(shuō) Flink SQL CDC 是一個(gè)集采集、計(jì)算、傳輸于一體的工具,其吸引我們的優(yōu)點(diǎn)有:

    ① 減少維護(hù)的組件、簡(jiǎn)化實(shí)現(xiàn)鏈路;?

    ② 減少端到端延遲;?

    ③ 減輕維護(hù)成本和開(kāi)發(fā)成本;?

    ④ 支持 Exactly Once 的讀取和計(jì)算(由于我們是賬務(wù)系統(tǒng),所以數(shù)據(jù)一致性非常重要);?

    ⑤ 數(shù)據(jù)不落地,減少存儲(chǔ)成本;?

    ⑥ 支持全量和增量流式讀取;

    ?

    項(xiàng)目使用的是 flink-cdc-connectors 中提供的 mysql-cdc 組件。這是一個(gè) Flink 數(shù)據(jù)源,支持對(duì) MySQL 數(shù)據(jù)庫(kù)的全量和增量讀取。它在掃描全表前會(huì)先加一個(gè)全局讀鎖,然后獲取此時(shí)的 binlog position,緊接著釋放全局讀鎖。隨后開(kāi)始掃描全表,當(dāng)全表快照讀取完后,會(huì)從之前獲取的 binlog position 獲取增量的變更記錄。因此這個(gè)讀鎖是非常輕量的,持鎖時(shí)間非常短,不會(huì)對(duì)線上業(yè)務(wù)造成太大影響。更多信息可以參考 flink-cdc-connectors 項(xiàng)目官網(wǎng):https://github.com/ververica/flink-cdc-connectors。

    ?

    03 項(xiàng)目運(yùn)行環(huán)境與現(xiàn)狀

    我們?cè)谏a(chǎn)環(huán)境搭建了 Hadoop + Flink + Elasticsearch 分布式環(huán)境,采用的 Flink on YARN 的 per-job 模式運(yùn)行,使用 RocksDB 作為 state backend,HDFS 作為 checkpoint 持久化地址,并且做好了 HDFS 的容錯(cuò),保證 checkpoint 數(shù)據(jù)不丟失。我們使用 SQL Client 提交作業(yè),所有作業(yè)統(tǒng)一使用純 SQL,沒(méi)有寫(xiě)一行 Java 代碼。

    ?

    目前已上線了 3 個(gè)基于 Flink CDC 的作業(yè),已穩(wěn)定在線上運(yùn)行了兩個(gè)星期,并且業(yè)務(wù)產(chǎn)生的訂單實(shí)收和賬單實(shí)收數(shù)據(jù)能實(shí)時(shí)聚合輸出到 Elasticsearch,輸出的數(shù)據(jù)準(zhǔn)確無(wú)誤。現(xiàn)在也正在對(duì)其他報(bào)表采用 Flink SQL CDC 進(jìn)行實(shí)時(shí)化改造,替換舊的業(yè)務(wù)系統(tǒng),讓系統(tǒng)數(shù)據(jù)更實(shí)時(shí)。

    ?

    04 具體實(shí)現(xiàn)

    ① 進(jìn)入 Flink/bin,使用 ./sql-client.sh embedded 啟動(dòng) SQL CLI 客戶端。?

    ② 使用 DDL 創(chuàng)建 Flink Source 和 Sink 表。這里創(chuàng)建的表字段個(gè)數(shù)不一定要與 MySQL 的字段個(gè)數(shù)和順序一致,只需要挑選 MySQL 表中業(yè)務(wù)需要的字段即可,并且字段類型保持一致。

    -- 在Flink創(chuàng)建賬單實(shí)收source表CREATE TABLE bill_info ( billCode STRING, serviceCode STRING, accountPeriod STRING, subjectName STRING , subjectCode STRING, occurDate TIMESTAMP, amt DECIMAL(11,2), status STRING, proc_time AS PROCTIME() -–使用維表時(shí)需要指定該字段) WITH ( 'connector' = 'mysql-cdc', -- 連接器 'hostname' = '******', --mysql地址 'port' = '3307', -- mysql端口 'username' = '******', --mysql用戶名 'password' = '******', -- mysql密碼 'database-name' = 'cdc', -- 數(shù)據(jù)庫(kù)名稱 'table-name' = '***'); -- 在Flink創(chuàng)建訂單實(shí)收source表CREATE TABLE order_info ( orderCode STRING, serviceCode STRING, accountPeriod STRING, subjectName STRING , subjectCode STRING, occurDate TIMESTAMP, amt DECIMAL(11, 2), status STRING, proc_time AS PROCTIME() -–使用維表時(shí)需要指定該字段) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '******', 'port' = '3307', 'username' = '******', 'password' = '******', 'database-name' = 'cdc', 'table-name' = '***',); -- 創(chuàng)建科目維表CREATE TABLE subject_info ( code VARCHAR(32) NOT NULL, name VARCHAR(64) NOT NULL, PRIMARY KEY (code) NOT ENFORCED --指定主鍵) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://xxxx:xxxx/spd?useSSL=false&autoReconnect=true', 'driver' = 'com.mysql.cj.jdbc.Driver', 'table-name' = '***', 'username' = '******', 'password' = '******', 'lookup.cache.max-rows' = '3000', 'lookup.cache.ttl' = '10s', 'lookup.max-retries' = '3'); -- 創(chuàng)建實(shí)收分布結(jié)果表,把結(jié)果寫(xiě)到 ElasticsearchCREATE TABLE income_distribution ( serviceCode STRING, accountPeriod STRING, subjectCode STRING, subjectName STRING, amt DECIMAL(13,2), PRIMARY KEY (serviceCode, accountPeriod, subjectCode) NOT ENFORCED) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://xxxx:9200', 'index' = 'income_distribution', 'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL');

    以上的建表 DDL 分別創(chuàng)建了訂單實(shí)收 source 表、賬單實(shí)收 source 表、產(chǎn)品科目維表和 Elasticsearch 結(jié)果表。建表完成后,Flink 是不會(huì)馬上去同步 MySQL 的數(shù)據(jù),而是等到用戶提交了一個(gè) insert 作業(yè)后才會(huì)執(zhí)行同步數(shù)據(jù),并且 Flink 不會(huì)存儲(chǔ)數(shù)據(jù)。我們的第一個(gè)作業(yè)是計(jì)算收入分布,數(shù)據(jù)來(lái)源于 bill_info 和 order_info 兩張 MySQL 表,并且賬單實(shí)收表和訂單實(shí)收表都需要關(guān)聯(lián)維表數(shù)據(jù)獲取應(yīng)收科目的最新中文名稱,按照服務(wù)中心、賬期、科目代碼和科目名稱進(jìn)行分組計(jì)算實(shí)收金額的 sum 值,實(shí)收分布具體 DML 如下:

    INSERT INTO income_distributionSELECT t1.serviceCode, t1.accountPeriod, t1.subjectCode, t1.subjectName, SUM(amt) AS amt FROM ( SELECT b.serviceCode, b.accountPeriod, b.subjectCode, s.name AS subjectName, SUM(amt) AS amt FROM bill_info AS b JOIN subject_info FOR SYSTEM_TIME AS OF b.proc_time s ON b.subjectCode = s.code GROUP BY b.serviceCode, b.accountPeriod, b.subjectCode, s.nameUNION ALL SELECT b.serviceCode, b.accountPeriod, b.subjectCode, s.name AS subjectName, SUM(amt) AS amt FROM order_info AS b JOIN subject_info FOR SYSTEM_TIME AS OF b.proc_time s ON b.subjectCode = s.code GROUP BY b.serviceCode, b.accountPeriod, b.subjectCode, s.name) AS t1GROUP BY t1.serviceCode, t1.accountPeriod, t1.subjectCode, t1.subjectName;

    Flink SQL 的維表 JOIN 和雙流 JOIN 寫(xiě)法上不太一樣,對(duì)于維表,還需要在 Flink source table 上添加一個(gè) proctime 字段 proc_time AS PROCTIME(),關(guān)聯(lián)的時(shí)候使用 FOR SYSTEM_TIME AS OF 的 SQL 語(yǔ)法查詢時(shí)態(tài)表,意思是關(guān)聯(lián)查詢最新版本的維表數(shù)據(jù)。

    關(guān)于維表 JOIN 的使用可參閱:

    https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/joins.html。

    ?

    ③ 在 SQL Client 執(zhí)行以上作業(yè)后,YARN 會(huì)創(chuàng)建一個(gè) Flink 集群運(yùn)行作業(yè),并且用戶可以在 Hadoop 上查看到執(zhí)行作業(yè)的所有信息,并且能進(jìn)入 Flink 的 Web UI 頁(yè)面查看 Flink 作業(yè)詳情,以下是 Hadoop 所有作業(yè)情況。

    ?

    ④ 作業(yè)提交后,Flink SQL CDC 會(huì)掃描指定的 MySQL 表,在這期間 Flink 也會(huì)進(jìn)行 checkpoint,所以需要按照上文所述的配置 checkpoint 的重試策略和重試次數(shù)。當(dāng)數(shù)據(jù)被讀取進(jìn) Flink 后,Flink 會(huì)流式地進(jìn)行作業(yè)邏輯的計(jì)算,實(shí)時(shí)統(tǒng)計(jì)出聚合結(jié)果輸出到 Elasticsearch(sink 端)。相當(dāng)于我們使用 Flink 在 MySQL 的表上維護(hù)了一個(gè)實(shí)時(shí)的物化視圖,并將這個(gè)實(shí)時(shí)物化視圖的結(jié)果存在了 Elasticsearch 中。在 Elasticsearch 中使用 GET /income_distribution/_search{ "query": {"match_all": {}}} 命令查看輸出的實(shí)收分布結(jié)果,如下圖:

    通過(guò)圖中的結(jié)果可以看出聚合結(jié)果被實(shí)時(shí)的計(jì)算出來(lái),并寫(xiě)到了 Elasticsearch 中了。

    ?

    05 踩過(guò)的坑和學(xué)到的經(jīng)驗(yàn)

    ?

    1. Flink 作業(yè)原來(lái)運(yùn)行在 standalone session 模式下,提交多個(gè) Flink 作業(yè)會(huì)導(dǎo)致作業(yè)失敗報(bào)錯(cuò)。

    • 原因:因?yàn)?standalone session 模式下啟動(dòng)多個(gè)作業(yè)會(huì)導(dǎo)致多個(gè)作業(yè)的 Task 共享一個(gè) JVM,可能會(huì)導(dǎo)致一些不穩(wěn)定的問(wèn)題。并且排查問(wèn)題時(shí),多個(gè)作業(yè)的日志混在一個(gè) TaskManager 中,增加了排查的難度。

    • 解決方法:采用 YARN 的 per-job 模式啟動(dòng)多個(gè)作業(yè),能有更好的隔離性。

    ?

    2. SELECT elasticsearch table 報(bào)以下錯(cuò)誤:

    • 原因:Elasticsearch connector 目前只支持了 sink,不支持 source 。所以不能 SELECT elasticsearch table。
    • ?

    3. 在 flink-conf.yaml 里修改默認(rèn)并行度,但是在 Web UI 看到作業(yè)的并行度還是 1,并行度修改不生效。

    • 解決辦法:在使用 SQL Client 時(shí) sql-client-defaults.yaml 中的并行度配置的優(yōu)先級(jí)更高。在 sql-client-defaults.yaml 中修改并行度,或者刪除 sql-client-defaults.yaml 中的并行度配置。更建議采用后者。

    ?

    4. Flink 作業(yè)在掃描 MySQL 全量數(shù)據(jù)時(shí),checkpoint 超時(shí),出現(xiàn)作業(yè) failover,如下圖:

    • 原因:Flink CDC 在 scan 全表數(shù)據(jù)(我們的實(shí)收表有千萬(wàn)級(jí)數(shù)據(jù))需要小時(shí)級(jí)的時(shí)間(受下游聚合反壓影響),而在 scan 全表過(guò)程中是沒(méi)有 offset 可以記錄的(意味著沒(méi)法做 checkpoint),但是 Flink 框架任何時(shí)候都會(huì)按照固定間隔時(shí)間做 checkpoint,所以此處 mysql-cdc source 做了比較取巧的方式,即在 scan 全表的過(guò)程中,會(huì)讓執(zhí)行中的 checkpoint 一直等待甚至超時(shí)。超時(shí)的 checkpoint 會(huì)被仍未認(rèn)為是 failed checkpoint,默認(rèn)配置下,這會(huì)觸發(fā) Flink 的 failover 機(jī)制,而默認(rèn)的 failover 機(jī)制是不重啟。所以會(huì)造成上面的現(xiàn)象。
    • 解決辦法:在 flink-conf.yaml 配置 failed checkpoint 容忍次數(shù),以及失敗重啟策略,如下:

    execution.checkpointing.interval: 10min # checkpoint間隔時(shí)間execution.checkpointing.tolerable-failed-checkpoints: 100 # checkpoint 失敗容忍次數(shù)restart-strategy: fixed-delay # 重試策略restart-strategy.fixed-delay.attempts: 2147483647 # 重試次數(shù)

    目前 Flink 社區(qū)也有一個(gè) issue(FLINK-18578)來(lái)支持 source 主動(dòng)拒絕 checkpoint 的機(jī)制,將來(lái)基于該機(jī)制,能比較優(yōu)雅地解決這個(gè)問(wèn)題。

    ?

    5. Flink 怎么樣開(kāi)啟 YARN 的 per-job 模式?

    • 解決方法:在 flink-conf.yaml 中配置 execution.target: yarn-per-job。

    ?

    6. 進(jìn)入 SQL Client 創(chuàng)建 table 后,在另外一個(gè)節(jié)點(diǎn)進(jìn)入 SQL Client 查詢不到 table。

    • 原因:因?yàn)?SQL Client 默認(rèn)的 Catalog 是在 in-memory 的,不是持久化? Catalog,所以這屬于正?,F(xiàn)象,每次啟動(dòng) Catalog 里面都是空的。

    ?

    7. 作業(yè)在運(yùn)行時(shí) Elasticsearch 報(bào)如下錯(cuò)誤:

    Caused by: org.apache.Flink.elasticsearch7.shaded.org.elasticsearch.ElasticsearchException: Elasticsearch exception [type=illegal_argument_exception, reason=mapper [amt] cannot be changed from type [long] to [float]]

    • 原因:數(shù)據(jù)庫(kù)表的字段 amt 的類型是 decimal,DDL 創(chuàng)建輸出到 es 的 amt 字段的類型也是 decimal,因?yàn)檩敵龅?es 的第一條數(shù)據(jù)的amt如果是整數(shù),比如是 10,輸出到 es 的類型是 long 類型的,es client 會(huì)自動(dòng)創(chuàng)建 es 的索引并且設(shè)置 amt 字段為 long 類型的格式,那么如果下一次輸出到 es 的 amt 是非整數(shù) 10.1,那么輸出到 es 的時(shí)候就會(huì)出現(xiàn)類型不匹配的錯(cuò)誤。

    • 解決方法:手動(dòng)生成 es 索引和 mapping 的信息,指定好 decimal 類型的數(shù)據(jù)格式是 saclefloat,但是在 DDL 處仍然可以保留該字段類型是 decimal。

    ?

    8. 作業(yè)在運(yùn)行時(shí) mysql cdc source 報(bào)如下錯(cuò)誤:

    ?

    • 原因:因?yàn)閿?shù)據(jù)庫(kù)中別的表做了字段修改,CDC source 同步到了 ALTER DDL 語(yǔ)句,但是解析失敗拋出的異常。

    • 解決方法:在 flink-cdc-connectors 最新版本中已經(jīng)修復(fù)該問(wèn)題(跳過(guò)了無(wú)法解析的 DDL)。升級(jí) connector jar 包到最新版本 1.1.0:flink-sql-connector-mysql-cdc-1.1.0.jar,替換 flink/lib 下的舊包。

    ?

    9. 掃描全表階段慢,在 Web UI 出現(xiàn)如下現(xiàn)象:

    ?

    • 原因:掃描全表階段慢不一定是 cdc source 的問(wèn)題,可能是下游節(jié)點(diǎn)處理太慢反壓了。

    • 解決方法:通過(guò) Web UI 的反壓工具排查發(fā)現(xiàn),瓶頸主要在聚合節(jié)點(diǎn)上。通過(guò)在 sql-client-defaults.yaml 文件配上 MiniBatch 相關(guān)參數(shù)和開(kāi)啟 distinct 優(yōu)化(我們的聚合中有 count distinct),作業(yè)的 scan 效率得到了很大的提升,從原先的 10 小時(shí),提升到了 1 小時(shí)。關(guān)于性能調(diào)優(yōu)的參數(shù)可以參閱:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/tuning/streaming_aggregation_optimization.html。

    configuration: table.exec.mini-batch.enabled: true table.exec.mini-batch.allow-latency: 2s table.exec.mini-batch.size: 5000 table.optimizer.distinct-agg.split.enabled: true

    ?

    10. CDC source 掃描 MySQL 表期間,發(fā)現(xiàn)無(wú)法往該表 insert 數(shù)據(jù)。

    • 原因:由于使用的 MySQL 用戶未授權(quán) RELOAD 權(quán)限,導(dǎo)致無(wú)法獲取全局讀鎖(FLUSH TABLES WITH READ LOCK), CDC source 就會(huì)退化成表級(jí)讀鎖,而使用表級(jí)讀鎖需要等到全表 scan 完,才能釋放鎖,所以會(huì)發(fā)現(xiàn)持鎖時(shí)間過(guò)長(zhǎng)的現(xiàn)象,影響其他業(yè)務(wù)寫(xiě)入數(shù)據(jù)。

    • 解決方法:給使用的 MySQL 用戶授予 RELOAD 權(quán)限即可。所需的權(quán)限列表詳見(jiàn)文檔:https://github.com/ververica/flink-cdc-connectors/wiki/mysql-cdc-connector#setup-mysql-server。如果出于某些原因無(wú)法授予 RELOAD 權(quán)限,也可以顯式配上 'debezium.snapshot.locking.mode' = 'none'來(lái)避免所有鎖的獲取,但要注意只有當(dāng)快照期間表的 schema 不會(huì)變更才安全。

    ?

    11. 多個(gè)作業(yè)共用同一張 source table 時(shí),沒(méi)有修改 server id 導(dǎo)致讀取出來(lái)的數(shù)據(jù)有丟失。

    • 原因:MySQL binlog 數(shù)據(jù)同步的原理是,CDC source 會(huì)偽裝成 MySQL 集群的一個(gè) slave(使用指定的 server id 作為唯一 id),然后從 MySQL 拉取 binlog 數(shù)據(jù)。如果一個(gè) MySQL 集群中有多個(gè) slave 有同樣的 id,就會(huì)導(dǎo)致拉取數(shù)據(jù)錯(cuò)亂的問(wèn)題。

    • 解決方法:默認(rèn)會(huì)隨機(jī)生成一個(gè) server id,容易有碰撞的風(fēng)險(xiǎn)。所以建議使用動(dòng)態(tài)參數(shù)(table hint)在 query 中覆蓋 server id。如下所示:

    SELECT *FROM bill_info /*+ OPTIONS('server-id'='123456') */ ;

    ?

    12. 在啟動(dòng)作業(yè)時(shí),YARN 接收了任務(wù),但作業(yè)一直未啟動(dòng):

    ?

    • 原因:Queue Resource Limit for AM 超過(guò)了限制資源限制。默認(rèn)的最大內(nèi)存是 30G (集群內(nèi)存) * 0.1 = 3G,而每個(gè) JM 申請(qǐng) 2G 內(nèi)存,當(dāng)提交第二個(gè)任務(wù)時(shí),資源就不夠了。

    • 解決方法:調(diào)大 AM 的 resource limit,在 capacity-scheduler.xml 配置 yarn.scheduler.capacity.maximum-am-resource-percent,代表AM的占總資源的百分比,默認(rèn)為0.1,改成0.3(根據(jù)服務(wù)器的性能靈活配置)。

    ?

    13. AM 進(jìn)程起不來(lái),一直被 kill 掉。

    ?

    • 原因:386.9 MB of 1 GB physical memory used; 2.1 GB of 2.1 GB virtual memory use。默認(rèn)物理內(nèi)存是 1GB,動(dòng)態(tài)申請(qǐng)到了 1GB,其中使用了386.9 MB。物理內(nèi)存 x 2.1=虛擬內(nèi)存,1GBx2.1≈2.1GB ,2.1GB 虛擬內(nèi)存已經(jīng)耗盡,當(dāng)虛擬內(nèi)存不夠時(shí)候,AM 的 container 就會(huì)自殺。

    • 解決方法:兩個(gè)解決方案,或調(diào)整 yarn.nodemanager.vmem-pmem-ratio 值大點(diǎn),或 yarn.nodemanager.vmem-check-enabled=false,關(guān)閉虛擬內(nèi)存檢查。參考:https://blog.csdn.net/lzxlfly/article/details/89175452。

    ?

    06 總結(jié)

    為了提升了實(shí)時(shí)報(bào)表服務(wù)的可用性和實(shí)時(shí)性,一開(kāi)始我們采用了 Canal+Kafka+Flink 的方案,可是發(fā)現(xiàn)需要寫(xiě)比較多的 Java 代碼,而且還需要處理好 DataStream 和 Table 的轉(zhuǎn)換以及 binlong 位置的獲取,開(kāi)發(fā)難度相對(duì)較大。另外,需要維護(hù) Kafka 和 Canal 這兩個(gè)組件的穩(wěn)定運(yùn)行,對(duì)于小團(tuán)隊(duì)來(lái)說(shuō)成本也不小。由于已經(jīng)有基于 Flink 的任務(wù)在線上運(yùn)行,因此采用 Flink SQL CDC 就成了順理成章的事情?;?Flink SQL CDC 的方案只需要編寫(xiě) SQL ,不用寫(xiě)一行 Java 代碼就能完成實(shí)時(shí)鏈路的打通和實(shí)時(shí)報(bào)表的計(jì)算,對(duì)于我們來(lái)說(shuō)非常的簡(jiǎn)單易用,而且在線上運(yùn)行的穩(wěn)定性和性能表現(xiàn)也讓我們滿意。

    公司內(nèi)大力推廣 Flink SQL CDC 的使用,也正在著手改造其他幾個(gè)實(shí)時(shí)鏈路的任務(wù)。非常感謝開(kāi)源社區(qū)能為我們提供如此強(qiáng)大的工具,也希望 Flink CDC 越來(lái)越強(qiáng)大,支持更多的數(shù)據(jù)庫(kù)和功能。也再次感謝云邪老師對(duì)于我們項(xiàng)目上線的大力支持!

    總結(jié)

    以上是生活随笔為你收集整理的Flink SQL CDC 13 条生产实践经验的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。