日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

精通Flink项目优化(一.资源配置调优)

發布時間:2023/12/20 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 精通Flink项目优化(一.资源配置调优) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

資源配置調優

Flink 性能調優的第一步,就是為任務分配合適的資源,在一定范圍內,增加資源的分配與性能的提升是成正比的,實現了最優的資源配置后,在此基礎上再考慮進行后面論述的性能調優策略。
提交方式主要是 yarn-per-job,資源的分配在使用腳本提交 Flink 任務時進行指定。

內存設置

生產資源配置:

bin/flink run \ -t yarn-per-job \ -d \ -p 5 \ 指定并行度 -Dyarn.application.queue=test \ 指定 yarn 隊列 -Djobmanager.memory.process.size=2048mb \ JM2~4G 足夠 -Dtaskmanager.memory.process.size=6144mb \ 單個 TM2~8G 足夠 -Dtaskmanager.numberOfTaskSlots=2 \ 與容器核數 1core:1slot 或 1core:2slot -c com.atguigu.app.dwd.LogBaseApp \ /opt/module/gmall-flink/gmall-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar

Flink 是實時流處理,關鍵在于資源情況能不能抗住高峰時期每秒的數據量,通常用QPS/TPS 來描述數據情況。

并行度設置

  • 最優并行度計算
    開發完成后,先進行壓測。任務并行度給 10 以下,測試單個并行度的處理上限。

總 QPS/單并行度的處理能力 = 并行度

不能只從 QPS 去得出并行度,因為有些字段少、邏輯簡單的任務,單并行度一秒處理幾萬條數據。而有些數據字段多,處理邏輯復雜,單并行度一秒只能處理 1000 條數據。

最好根據高峰期的 QPS 壓測,并行度*1.2 倍,富余一些資源。

  • Source端并行度配置
    數據源端是 Kafka,Source 的并行度設置為 Kafka 對應 Topic 的分區數。
    如果已經等于 Kafka 的分區數,消費速度仍跟不上數據生產速度。

考慮下 Kafka 要擴大分區,同時調大并行度等于分區數。

Flink 的一個并行度可以處理一至多個分區的數據,如果并行度多于 Kafka 的分區數,那么就會造成有的并行度空閑,浪費資源。

  • Transform 端并行度的配置
  • Keyby 之前的算子
    一般不會做太重的操作,都是比如 map、filter、flatmap 等處理較快的算子,并行度可以和 source 保持一致。
  • Keyby 之后的算子
  • 如果并發較大,建議設置并行度為 2 的整數次冪,例如:128、256、512;

    小并發任務的并行度不一定需要設置成 2 的整數次冪;
    大并發任務如果沒有 KeyBy,并行度也無需設置為 2 的整數次冪;

    • Sink 端并行度的配置
      Sink 端是數據流向下游的地方,可以根據 Sink 端的數據量及下游的服務抗壓能力進行評估。
      如果 Sink 端是 Kafka,可以設為 Kafka 對應 Topic 的分區數。
      Sink 端的數據量小,比較常見的就是監控告警的場景,并行度可以設置的小一些。
      Source 端的數據量是最小的,拿到 Source 端流過來的數據后做了細粒度的拆分,數據量不斷的增加,到 Sink 端的數據量就非常大。那么在 Sink 到下游的存儲中間件的時候就需要提高并行度。
      另外 Sink 端要與下游的服務進行交互,并行度還得根據下游的服務抗壓能力來設置,
      如果在 Flink Sink 這端的數據量過大的話,且 Sink 處并行度也設置的很大,但下游的服務完全撐不住這么大的并發寫入,可能會造成下游服務直接被寫掛,所以最終還是要在 Sink 處的并行度做一定的權衡。

    RocksDB 大狀態調優

    RocksDB 是基于 LSM Tree 實現的(類似 HBase),寫數據都是先緩存到內存中,所以 RocksDB 的寫請求效率比較高。RocksDB 使用內存結合磁盤的方式來存儲數據,每次獲取數據時,先從內存中 blockcache 中查找,如果內存中沒有再去磁盤中查詢。優化后差不多單并行度 TPS 5000 record/s,性能瓶頸主要在于 RocksDB 對磁盤的讀請求,所以當處理性能不夠時,僅需要橫向擴展并行度即可提高整個 Job 的吞吐量。以下幾個調優參數:

  • 設置本地 RocksDB 多目錄
    在 flink-conf.yaml 中配置:
  • state.backend.rocksdb.localdir: /data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb

    注意:不要配置單塊磁盤的多個目錄,務必將目錄配置到多塊不同的磁盤上,讓多塊磁盤來
    分擔壓力

  • state.backend.incremental:開啟增量檢查點,默認 false,改為 true。
  • state.backend.rocksdb.predefined-options:
    SPINNING_DISK_OPTIMIZED_HIGH_MEM 設置為機械硬盤+內存模式,有條件上SSD,指定為 FLASH_SSD_OPTIMIZED
  • state.backend.rocksdb.block.cache-size: 整 個 RocksDB 共 享 一 個 block cache,讀數據時內存的 cache 大小,該參數越大讀數據時緩存命中率越高,默認大小為 8 MB,建議設置到 64 ~ 256 MB。
  • state.backend.rocksdb.thread.num: 用于后臺 flush 和合并 sst 文件的線程數,默認為 1,建議調大,機械硬盤用戶可以改為 4 等更大的值。
  • state.backend.rocksdb.writebuffer.size: RocksDB 中,每個 State 使用一個Column Family,每個 Column Family 使用獨占的 write buffer,建議調大,例如:32M
  • state.backend.rocksdb.writebuffer.count: 每 個 Column Family 對 應 的writebuffer 數目,默認值是 2,對于機械磁盤來說,如果內存?夠大,可以調大到 5 左右
  • state.backend.rocksdb.writebuffer.number-to-merge: 將數據從 writebuffer中 flush 到磁盤時,需要合并的 writebuffer 數量,默認值為 1,可以調成 3。
  • state.backend.local-recovery: 設置本地恢復,當 Flink 任務失敗時,可以基于本地的狀態信息進行恢復任務,可能不需要從 hdfs 拉取數據
  • Checkpoint 設置

    開發建議:

    一般我們的 Checkpoint 時間間隔可以設置為分鐘級別,例如 1 分鐘、3 分鐘,對于狀態很大的任務每次 Checkpoint 訪問 HDFS 比較耗時,可以設置為 5~10 分鐘一次Checkpoint,并且調大兩次 Checkpoint 之間的暫停間隔,例如設置兩次 Checkpoint 之間至少暫停 4 或 8 分鐘。
    如果 Checkpoint 語義配置為 EXACTLY_ONCE,那么在 Checkpoint 過程中還會存
    在 barrier 對齊的過程,可以通過 Flink Web UI 的 Checkpoint 選項卡來查看
    Checkpoint 過程中各階段的耗時情況,從而確定到底是哪個階段導致 Checkpoint 時間
    過長然后針對性的解決問題。

    flink-conf.yaml 指定RocksDB的相關參數,也可以job的代碼中調用api指定

    // 使? RocksDBStateBackend 做為狀態后端,并開啟增量 Checkpoint RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend("hdfs://hadoop102:8020/flink/checkpoints", true); env.setStateBackend(rocksDBStateBackend); // 開啟 Checkpoint,間隔為 3 分鐘 env.enableCheckpointing(TimeUnit.MINUTES.toMillis(3)); // 配置 Checkpoint CheckpointConfig checkpointConf = env.getCheckpointConfig(); checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // 最小間隔 4 分鐘 checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(4)) // 超時時間 10 分鐘 checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10)); // 保存 checkpoint checkpointConf.enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    Flink ParameterTool 讀取配置

    實際開發中,有各種環境(開發、測試、預發、生產),作業也有很多的配置:算子的并行度配置、Kafka 數據源的配置(broker 地址、topic 名、group.id)、Checkpoint 是否開啟、狀態后端存儲路徑、數據庫地址、用戶名和密碼等各種各樣的配置,可能每個環境的這些配置對應的值都是不一樣的。
    Flink 中可以通過使用 ParameterTool 類讀取配置,它可以讀取環境變量、運行參數、配置文件。

    ParameterTool 是可序列化的,可以將它當作參數進行傳遞給算子的自定義函數類。

    • 讀取運行參數
      可以在 Flink 的提交腳本添加運行參數,格式:

    –參數名 參數值
    -參數名 參數值

    Flink 程序中可以直接使用 ParameterTool.fromArgs(args) 獲取到所有的參數,也可以通過 parameterTool.get(“username”) 方法獲取某個參數對應的值。
    舉個栗子:通過運行參數執行jobname

    bin/flink run \ -t yarn-per-job \ -d \ -p 5 \ 指定并行度 -Dyarn.application.queue=test \ 指定 yarn 隊列 -Djobmanager.memory.process.size=1024mb \ 指定 JM 的總進程大小 -Dtaskmanager.memory.process.size=1024mb \ 指定每個 TM 的總進程大小 -Dtaskmanager.numberOfTaskSlots=2 \ 指定每個 TM 的 slot 數 -c com.atguigu.app.dwd.LogBaseApp \ /opt/module/gmall-flink/gmall-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar \ --jobname dwd-LogBaseApp //參數名自己隨便起,代碼里對應上即可

    在代碼里獲取參數值:

    ParameterTool parameterTool = ParameterTool.fromArgs(args); String myJobname = parameterTool.get("jobname"); //參數名對應 env.execute(myJobname);
    • 讀取系統屬性
      ParameterTool 還?持通過 ParameterTool.fromSystemProperties() 方法讀取系統屬性。做個打印:
    ParameterTool parameterTool = ParameterTool.fromSystemProperties(); System.out.println(parameterTool.toMap().toString());
    • 讀取配置文件
      可 以 使 用 ParameterTool.fromPropertiesFile(“/application.properties”) 讀 取properties 配置文件??梢詫⑺幸渲玫牡胤?#xff08;比如并行度和一些 Kafka、MySQL 等配置)都寫成可配置的,然后其對應的 key 和 value 值都寫在配置文件中,最后通過ParameterTool 去讀取配置文件獲取對應的值。
    • 注冊全局參數
      在 ExecutionConfig 中可以將 ParameterTool 注冊為全作業參數的參數,這樣就可以被 JobManager 的 web 端以及用戶?定義函數中以配置值的形式訪問。
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));

    可以不用將 ParameterTool 當作參數傳遞給算子的自定義函數,直接在用戶?定義的Rich 函數中直接獲取到參數值了。

    env.addSource(new RichSourceFunction() { @Override public void run(SourceContext sourceContext) throws Exception { while (true) { ParameterTool parameterTool = (ParameterTool)getRuntimeContext().getExecutionConfig().getGlobalJobParameter s();}} @Override public void cancel() { } })

    壓測方式

    先在 kafka 中積壓數據,之后開啟 Flink 任務,出現反壓,就是處理瓶頸。相當于水庫先積水,一下子泄洪。數據可以是自己造的模擬數據,也可以是生產中的部分數據

    總結

    以上是生活随笔為你收集整理的精通Flink项目优化(一.资源配置调优)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。