日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Apache Beam和BigQuery的错误处理(Java SDK)

發布時間:2025/6/17 java 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Apache Beam和BigQuery的错误处理(Java SDK) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

設計管道
假設我們有一個簡單的場景:事件正在流向Kafka,我們希望使用管道中的事件,進行一些轉換并將結果寫入BigQuery表,以使數據可用于分析。

可以在作業開始之前創建BigQuery表,或者Beam本身可以創建它。

代碼看起來很簡單:

EventsProcessingOptions options = PipelineOptionsFactory。fromArgs(args)。withValidation()

。作為(EventsProcessingOptions。類);

管道 p = 管道。創造(選項);
PCollection tableRows =

//閱讀kafka主題p。apply(“kafka-topic-read”,kafkaReader)。申請(“海邊的卡夫卡值”,MapElements。到(TypeDescriptors。字符串())。通過(記錄 - > 記錄。getKV。()的getValue()))//將值轉換為JsonNode。申請(“字符串到JSON” ,ParseJsons。的(JsonNode。類))//創建TableRow。申請(“建設-表行”,帕爾多。的(新 EventsRowFn()))//將表格行保存到BigQuery。apply(“BQ-write”,BigQueryIO。< TableRowWithEvent > write()。到(tableSpec)。withCreateDisposition(BigQueryIO。寫。createDisposition會。CREATE_NEVER)。withWriteDisposition(BigQueryIO。收件。writeDisposition會。WRITE_APPEND);

少了什么東西?
在現實世界中,可能會發生錯誤,在大多數情況下,我們將需要處理它們。

在上面的管道中,當我們嘗試將事件從Kafka解析為JsonNode,轉換期間以及BigQuery插入階段時,可能會發生錯誤。

錯誤處理計劃
對于每個錯誤,我們將在不同的BigQuery表中創建一行,其中包含更多信息,例如來自Kafka的origin事件。

一旦發生錯誤,我們就可以分析錯誤記錄并全面了解它。

然后,我們可以修復管道代碼,重置/更改Kafka使用者組偏移,并再次使用固定代碼重播事件。

我們還可以修復事件本身(例如,在JSON解析錯誤中)并將其重新發送到Kafka。

處理轉換錯誤
讓我們快速瀏覽一下我們的轉換函數:

@ProcessElement
public void processElement(@Element JsonNode > element,OutputReceiver < TableRowWithEvent > out){
TableRow convertedRow = new TableRow();
insertLong(元件。得到(“server_time” ),“server_time” ,convertedRow);
insertFloat(元件。得到(“screen_dpi” ),“screen_dpi” ,convertedRow);
//更多轉變來
背景。輸出(輸出);
}
private void insertLong(JsonNode value,String key,TableRow convertedRow){

String valueToInsert = value。asText();如果(valueToInsert != 空 && !valueToInsert。的isEmpty()){long longValue = Long。parseLong(valueToInsert);convertedRow。set(key,longValue);}

}
private void insertFloat(JsonNode value,String key,TableRow convertedRow){

String valueToInsert = getStringValue(value);if(valueToInsert != null){float floatValue = Float。parseFloat(valueToInsert);convertedRow。set(key,floatValue);}

}
是的,我們可能在解析過程中失敗,因為我們將字符串解析為Float / Long,并且這對無法轉換的數據失敗。

我們需要從主函數輸出中排除失敗的數據并將這些數據發送到管道中的不同路徑,然后我們將它們保存到BigQuery中的錯誤表中。

怎么樣?讓我們使用標簽
當我們在ParDo 函數末尾輸出一個元素時 ,我們可以在一個標簽內輸出它。然后我們可以獲取所有標記為特定名稱的元素,并對它們執行一些處理。

這里我們將使用兩個標簽,一個是MAIN標簽,它包含所有成功的記錄,另一個包含所有錯誤和一些上下文,例如 DEADLETTER_OUT。

該主標記必須與ParDo 函數本身的OUTPUT類型相同,并且 所有其他標記可以是不同類型。

現在,我們的 ParDo 函數將如下所示(注意標記添加):

@ProcessElement
public void processElement(@Element JsonNode > element,OutputReceiver < TableRowWithEvent > out){
public static final TupleTag < JsonNode > MAIN_OUT = new TupleTag < JsonNode >(){};
public static final TupleTag < BigQueryProcessError > DEADLETTER_OUT = new TupleTag < BigQueryProcessError >(){};
TableRow convertedRow = new TableRow();
嘗試 {

insertLong(元件。得到(“server_time” ),“server_time” ,convertedRow);insertFloat(元件。得到(“screen_dpi” ),“screen_dpi” ,convertedRow);//更多轉變來背景。輸出(輸出);

} catch(例外 e){

記錄器。誤差(“失敗變換” + ?。的getMessage(),ê);背景。輸出(DEADLETTER_OUT,新 BigQueryProcessError(convertedRow。的toString(),ê。的getMessage(),ERROR_TYPE。BQ_PROCESS,originEvent));

}
}
我們如何通過標簽處理元素?讓我們改變管道,并進行拆分。該 MAIN 元素將大量查詢表和 DEADLETTER_OUT 內容將被發送到錯誤表。

EventsProcessingOptions options = PipelineOptionsFactory。fromArgs(args)。withValidation()

。作為(EventsProcessingOptions。類);

管道 p = 管道。創造(選項);
PCollectionTuple tableRows =

//閱讀kafka主題p。apply(“kafka-topic-read”,kafkaReader)。申請(“海邊的卡夫卡值”,MapElements。到(TypeDescriptors。字符串())。通過(記錄 - > 記錄。getKV。()的getValue()))//將值轉換為JsonNode。申請(“字符串到JSON” ,ParseJsons。的(JsonNode。類))//創建TableRow。申請(“建設-表行”,帕爾多。的(新 EventsRowFn())。withOutputTags(MAIN_OUT,TupleTagList。的(DEADLETTER_OUT)));//將MAIN標簽保存到BQtableRows。得到(MAIN_OUT)。apply(“BQ-write”,BigQueryIO。< TableRowWithEvent > write()。到(tableSpec)。withCreateDisposition(BigQueryIO。寫。createDisposition會。CREATE_NEVER)。withWriteDisposition(BigQueryIO。收件。writeDisposition會。WRITE_APPEND);//將DEADLETTER_OUT保存到BQ錯誤表tableRows。得到(DEADLETTER_OUT)。申請(“BQ-進程的錯誤提取物”,帕爾多。的(新 BigQueryProcessErrorExtracFn()))。申請(“BQ-進程的錯誤寫”,BigQueryIO。writeTableRows()。to(errTableSpec)。withJsonSchema(errSchema)。withCreateDisposition(BigQueryIO。寫。createDisposition會。CREATE_IF_NEEDED)。withWriteDisposition(BigQueryIO。收件。writeDisposition會。WRITE_APPEND));

p。run();
處理BigQuery插入錯誤
為了在BigQuery插入期間處理錯誤,我們必須使用BiqQueryIO API。

讓我們放大寫入階段。并稍微改變一下:

WriteResult writeResult = tableRowToInsertCollection

。申請(“BQ-寫”,BigQueryIO。寫()//指定將返回失敗的行及其錯誤。withExtendedErrorInfo()。到(tableSpec)。withCreateDisposition(BigQueryIO。寫。createDisposition會。CREATE_NEVER)。withWriteDisposition(BigQueryIO。寫。writeDisposition會。WRITE_APPEND)//指定處理失敗插入的策略。。withFailedInsertRetryPolicy(InsertRetryPolicy。retryTransientErrors()));

//將失敗的行及其錯誤寫入錯誤表
寫結果

。getFailedInsertsWithErr()。申請(窗口。到(FixedWindows。的(持續時間。standardMinutes(5))))。申請(“BQ-插入錯誤提取物”,帕爾多。的(新 BigQueryInsertErrorExtractFn(tableRowToInsertView))。withSideInputs(tableRowToInsertView))。申請(“BQ-插入錯誤寫”,BigQueryIO。writeTableRows()。to(errTableSpec)。withJsonSchema(errSchema)。withCreateDisposition(BigQueryIO。寫。createDisposition會。CREATE_IF_NEEDED)。withWriteDisposition(BigQueryIO。收件。writeDisposition會。WRITE_APPEND));

在上面的代碼片段中,我們從BigQueryIO獲取失敗的TableRows及其錯誤。現在我們可以將它們轉換為另一個 TableRow 并將它們寫入錯誤表。在這種情況下,我們讓作業在需要時創建表。

總結

以上是生活随笔為你收集整理的Apache Beam和BigQuery的错误处理(Java SDK)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。