canal mysql 数据同步
首先canal是什么呢?
????canal是阿里巴巴旗下的一款開源項目,純Java開發。基于數據庫增量日志解析,提供增量數據訂閱&消費,目前主要支持了MySQL;簡單來說,canal 會將自己偽裝成 MySQL 從節點(Slave),并從主節點(Master)獲取 Binlog,解析后供消費端使用。canal 包含兩個組成部分:服務端和客戶端。服務端負責連接至不同的 MySQL 實例,并為每個實例維護一個事件消息隊列;客戶端則可以訂閱這些隊列中的數據變更事件,然后根據這些事件的變更類型(更新、刪除等類型)進行相管處理,比如將變更的數據更新到redis或者發送變更通知到第三方等。
接下來我們來簡單構建一個單節點canal實例,并通過客戶訂閱獲取變更事件進行相關業務操作。
1.從github上下載最新穩定版本canal,我下載的是canal.deployer-1.0.25,解壓縮
2.創建mysql用戶并授權(canal的原理是模擬自己為mysql slave,所以這里一定需要做為mysql slave的相關權限?)
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;3.修改mysql的配置文件
canal的原理是基于mysql binlog技術,所以這里一定需要開啟mysql的binlog寫入功能,建議配置binlog模式為row.(ps. 目前canal已經支持mixed/statement/row模式)
[mysqld] log-bin=mysql-bin #添加這一行就ok binlog-format=ROW #選擇row模式 server_id=1 #配置mysql replaction需要定義,不能和canal的slaveId重復 更改配置(conf/example/instance.properties):
| ################################################# ## mysql serverId canal.instance.mysql.slaveId=1234 # position info canal.instance.master.address=10.0.2.30:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= # table meta tsdb info (暫時不了解,先關閉) canal.instance.tsdb.enable=false? canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:} canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb canal.instance.tsdb.dbUsername=canal canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position =? #canal.instance.standby.timestamp =? # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.defaultDatabaseName=xxx_test canal.instance.connectionCharset=UTF-8 # table regex canal.instance.filter.regex=xxx_test.tb_score_info,xxx_test.tb_company_score_info # table black regex canal.instance.filter.black.regex= ################################################# |
以上紅色標注信息需要重點關注,
canal.instance.master.address為數據庫ip:port
canal.instance.dbUsername 數據庫用戶名
canal.instance.dbPassword 數據庫密碼
canal.instance.defaultDatabaseName 數據庫名稱
canal.instance.filter.regex 表示需要訂閱發生變更的表名稱,可以添加多個逗號隔開即可
4.啟動canal服務(/bin/startup.sh),jps查看CanalLauncher進程是否存在
5.編寫客戶端代碼
主入口代碼:
/** * @author xiaofeng * @version V1.0 * @title: DataSyncHandle.java * @package: com.xx.xx.app * @description: 數據同步處理 * @date 2018/3/5 0005 下午 7:47 */ public class DataSyncHandle extends BaseSpringApp {protected static final Logger logger = LoggerFactory.getLogger(DataSyncHandle.class); CanalClient canalClient; public DataSyncHandle(String[] args) {super(args, "classpath:dataSync-app-handle.xml"); this.canalClient = SpringContextManager.getBean(CanalClient.class); }public void run() {canalClient.connect(); }public static void main(String[] args) {int status = 1; try {start(args); status = 0; } catch (Throwable e) {logger.error("data sync exception.", e); logger.info("program exception exit! restarting......"); start(args); } finally {System.exit(status); logger.info("data sync handle exit......"); }}private static void start(String[] args) {DataSyncHandle app = new DataSyncHandle(args); app.run(); }
服務連接訂閱:
/** * @author xiaofeng * @version V1.0 * @title: CanalClient.java * @package: com.xx.xx.app.canal * @description: TODO * @date 2018/2/11 16:13 */ public class CanalClient {static Logger logger = LoggerFactory.getLogger(CanalClient.class); @Autowired ScoreInfoService scoreInfoService; @Value("${canal.server.host}")String hostName = "10.0.2.23"; @Value("${canal.server.port}")int port = 11111; @Value("${canal.server.destination}")String destination = "example"; @Value("${canal.server.username}")String username; @Value("${canal.server.password}")String password; public void connect() {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(hostName, port), destination, username, password); connector.connect(); connector.subscribe(Constant.TABLE); while (true) {Message message = null; try {message = connector.getWithoutAck(100); } catch (Exception e) {logger.error(e.getMessage(), e); }long batchId = message.getId(); if (batchId == -1 || message.getEntries().isEmpty()) {try {Thread.sleep(3000); } catch (InterruptedException e) {e.printStackTrace(); }} else {handle(message.getEntries()); connector.ack(batchId); }}}private void handle(List<CanalEntry.Entry> entries) {entries.stream().forEach(entry -> {String tableName = entry.getHeader().getTableName(); if (StringUtils.isNoneBlank(tableName)) {logger.info("table name:" + tableName); try {switch (TableEnum.getTableName(tableName)) {case TB_SCORE_INFO:scoreInfoService.userScoreReloadToCache(entry); break; default:break; }} catch (Exception e) {e.printStackTrace(); }}}); } }注意點:connector.subscribe(Constant.TABLE);表示需要訂閱發生變更的表名稱,可以添加多個逗號隔開即可,需要與canal.instance.filter.regex配置中的一致,否則會被覆蓋
獲取變更事件具體業務處理:
/** * @author xiaofeng * @version V1.0 * @title: ScoreInfoServiceImpl.java * @package: com.xx.xx.app.service.impl * @description: 積分服務 * @date 2018/3/6 0006 下午 1:36 */ public class ScoreInfoServiceImpl extends BaseService implements ScoreInfoService {Logger logger = LoggerFactory.getLogger(getClass()); @Autowired ScoreInfoMapper scoreInfoMapper; @Autowired RedisExtendClient redisExtendClient; /** * 用戶積分重載 * * @param entry */ @Override public void userScoreReloadToCache(CanalEntry.Entry entry) {CanalEntry.RowChange rowChange = null; try {rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) {e.printStackTrace(); }for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {if (rowChange.getEventType() == CanalEntry.EventType.INSERT || rowChange.getEventType() == CanalEntry.EventType.UPDATE) {logger.info("[operation = insert or update] "); Map<String, String> values = convertToBeanMap(rowData.getAfterColumnsList()); ScoreInfo scoreInfo = JsonUtils.json2Obj(JsonUtils.obj2Json(values), new TypeReference<ScoreInfo>() {}); logger.info("scoreInfo: " + JsonUtils.obj2Json(scoreInfo)); BigDecimal currentDeposit = scoreInfo.getRechargeDeposit().add(scoreInfo.getPresentDeposit()).add(scoreInfo.getProfitDeposit()).add(scoreInfo.getInviteDeposit()).subtract(scoreInfo.getBlockingPoint()); BigDecimal formatValue = DataFormatUtil.defaultDecimalFormat(currentDeposit); redisExtendClient.hset(Constant.SCORE, Constant.USER_SCORE + scoreInfo.getId(), String.valueOf(formatValue), Constant.SCORE_DBINDEX); } else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {String columnId = getColumnId(rowData, "user_id"); if (StringUtils.isBlank(columnId)) {return; }logger.info("[operation = delete]"); logger.info("id=[" + columnId + "]"); redisExtendClient.hdel(Constant.SCORE, Constant.USER_SCORE + columnId, Constant.SCORE_DBINDEX); }}} }
啟動客戶端,手動去數據庫對訂閱的表變更一個字段即可,然后查看客戶端控制臺發現已經獲取到了變更事件信息,我的業務代碼處理邏輯會把變更后的數據更新到redis中,當然你也可以有其它操作哦:
好啦,至此我們的簡單版canal數據同步已經搭建成功了哦,可以開始你的數據同步之旅了哦!
總結
以上是生活随笔為你收集整理的canal mysql 数据同步的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java kafka 多线程消费
- 下一篇: springboot data.redi