日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

mysql kafka binlog_为什么使用kafka处理mysql binlog?

發布時間:2023/12/20 数据库 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 mysql kafka binlog_为什么使用kafka处理mysql binlog? 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

在開發 Spark Streaming 的公共組件過程中,需要將 binlog 的數據(Array[Byte])轉換為 Json 格式,供用戶使用,本文提供一種轉換的思路。另外我們會用到幾個輔助類,為了行文流暢,我們將輔助類的定義放在文章的最后面。如果如果本文有講述不詳細,或者錯誤指出,肯請指出,謝謝對于 binlog 數據,每一次操作(INSERT/UPDATE/DELETE 等)都會作為一條記錄寫入 binlog 文件,但是同一條記錄可能包含數據庫中的幾行數據(這里比較繞,可以看一個具體的例子)在數據庫中,有 id, name 兩個字段,其中 id 為主鍵,name 隨意, age 隨意。有兩行數據如下idnameage

1john30

2john40

那么你進行操作

update table set age = 50 where name = john的時候,就會將兩行的數據都進行更改,這兩行更改的數據會在同一個 binlog 記錄中,這一點會在后面的實現中有體現。

下面,我們給出具體的代碼,然后對代碼進行分析def desirializeByte(b: (String, Array[Byte])) : (String, String) = {val binlogEntry = BinlogEntryUtil.serializeToBean(b._2) //將 Array[Byte] 數據轉換成 com.meituan.data.binlog.BinlogEntry 類,相關類定義參考附錄val pkeys = binlogEntry.getPrimaryKeys.asScala //獲取主鍵,這里的 asScala 將 Java 的 List 轉換為 Scala 的 Listval rowDatas : List[BinlogRow] = binlogEntry.getRowDatas.asScala.toList //獲取具體的信息val strRowDatas = rowDatas.map(a => { //將獲取到的具體信息進行轉換,這里主要是將沒一條信息的內容,轉換 [(K1:V1,K2:V2...Kn:Vn)] 的形式,方面后面進行 Json 化val b = a.getBeforeColumns.asScala //獲取 beforColumnsval c = a.getAfterColumns.asScala //獲取 afterColumnsval mb = b.map(d => (d._1, d._2.getValue)) //去掉所有不需要的信息,只保留每個字段的值val mc = c.map(c => (c._1, c._2.getValue)) //去掉所有不需要的信息,只保留每個字段的值(mb, mc) //返回轉換后的 beforeColumns 和 afterColumns})

//下面利用 json4s 進行 Json 化

(binlogEntry.getEventType, compact("rowdata" -> strRowDatas.map{w => List("row_data" -> ("before" -> w._1.toMap) ~ ("after" -> w._2.toMap)) //這里的兩個 toMap 是必要的,不然里層會變成 List,這個地方比較疑惑的是,//w._1 按理是 Map類型,為什么還需要強制轉換成 Map//而且用 strRowDatas.foreach(x => println(s"${x._1} ${x._2}")打印的結果表名是 Map}))

desirializeByte 函數傳入 topic 中的一條記錄,返回參數自己確定,我這里為了測試,返回一個 (String, String) 的 Tuple,第一個字段表示該條記錄的 EventType(Insert/Update/Delete 等),第二個字段為 Json 化后的數據。

BinlogEntryUtil.serilizeToBean 是一個輔助類,將 binlog 數據轉化為一個 Java bean 類。

第 4 行,我們得到表對應的主鍵,第 5 行獲得具體的數據第 6 行到第 12 行是 Json 化之前的輔助工作,將所有不需要的東西給剔除掉,只留下字段,以及字段對應的值。

第 14, 15 行就是具體的 Json 工作了(使用了 json4s 包進行 Json 化)這個過程中有一點需要注意的是,在 Json 化的時候,記得為 w._1 和 w._2 加 toMap 操作,不然會變成 List(很奇怪,我將 w._1 和 w._2 打印出來看,都是 Map 類型)或者你可以在第 7,8 行的末尾加上 .toMap 操作。這個我查了 API,進行了實驗,暫時懷疑是在和 json4s 組合的時候,出現了問題,有待驗證。

利用上述代碼,我們可以得到下面這樣 Json 化之后的字符串(我進行了排版,程序返回的 Json 串是不換行的){"rowdata":

[{"row_data":

{"before":{"param_name":"creator","param_value":"chenqiang05","horigindb_etl_id":"2532","utime":"2016-07-26 15:07:16","id":"15122","status":"0","ctime":"2016-07-25 17:06:01"},"after":{"param_name":"creator","param_value":"chendayao","horigindb_etl_id":"2532","utime":"2016-08-01 10:32:01","id":"15122","status":"0","ctime":"2016-07-25 17:06:01"}

}

}]

}"

到這里,基本就完成了一種將 binlog 數據 Json 化的代碼。

附錄代碼,由于這些代碼是從其他工程里面摳出來的,可能讀起來會不順暢,還請見諒。

public static BinlogEntryserializeToBean(byte[] input) {BinlogEntrybinlogEntry = null;

Entryentry = deserializeFromProtoBuf(input);//從 protobuf 反序列化if(entry != null) {

binlogEntry = serializeToBean(entry);

}

return binlogEntry;

}

public static EntrydeserializeFromProtoBuf(byte[] input) {Entryentry = null;

try {

entry = Entry.parseFrom(input);

//com.alibaba.otter.canal.protocol.CanalEntry#Entry 類的方法,由 protobuf 生成} catch (InvalidProtocolBufferExceptionvar3) {logger.error("Exception:" + var3);

}

return entry;

}

//將 Entry 解析為一個 bean 類

public static BinlogEntryserializeToBean(Entryentry) {RowChangerowChange = null;

try {

rowChange = RowChange.parseFrom(entry.getStoreValue());} catch (Exceptionvar8) {

throw new RuntimeException("parse event has an error , data:" + entry.toString(), var8);}

BinlogEntrybinlogEntry = new BinlogEntry();String[] logFileNames = entry.getHeader().getLogfileName().split("\\.");String logFileNo = "000000";

if(logFileNames.length > 1) {

logFileNo = logFileNames[1];

}

binlogEntry.setBinlogFileName(logFileNo);binlogEntry.setBinlogOffset(entry.getHeader().getLogfileOffset());binlogEntry.setExecuteTime(entry.getHeader().getExecuteTime());binlogEntry.setTableName(entry.getHeader().getTableName());binlogEntry.setEventType(entry.getHeader().getEventType().toString());IteratorprimaryKeysList = rowChange.getRowDatasList().iterator();while(primaryKeysList.hasNext()) {

RowDatarowData = (RowData)primaryKeysList.next();BinlogRowrow = new BinlogRow(binlogEntry.getEventType());row.setBeforeColumns(getColumnInfo(rowData.getBeforeColumnsList()));row.setAfterColumns(getColumnInfo(rowData.getAfterColumnsList()));binlogEntry.addRowData(row);

}

if(binlogEntry.getRowDatas().size() >= 1) {BinlogRowprimaryKeysList1 = (BinlogRow)binlogEntry.getRowDatas().get(0);binlogEntry.setPrimaryKeys(getPrimaryKeys(primaryKeysList1));} else {

ArrayListprimaryKeysList2 = new ArrayList();binlogEntry.setPrimaryKeys(primaryKeysList2);}

return binlogEntry;

}

public class BinlogEntry implements Serializable {private String binlogFileName;

private long binlogOffset;

private long executeTime;

private String tableName;

private String eventType;

private List primaryKeys;

private List rowDatas = new ArrayList();}

public class BinlogRow implements Serializable {public static final String EVENT_TYPE_INSERT = "INSERT";public static final String EVENT_TYPE_UPDATE = "UPDATE";public static final String EVENT_TYPE_DELETE = "DELETE";private String eventType;

private Map beforeColumns;private Map afterColumns;}

public class BinlogColumn implements Serializable {private int index;

private String mysqlType;

private String name;

private boolean isKey;

private boolean updated;

private boolean isNull;

private String value;

}

總結

以上是生活随笔為你收集整理的mysql kafka binlog_为什么使用kafka处理mysql binlog?的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。