日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

sql server cdc 清理_基于CDC技术的ElasticSearch索引同步机制

發布時間:2023/12/15 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 sql server cdc 清理_基于CDC技术的ElasticSearch索引同步机制 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

概述

ElasticSearch作為一個基于Lucene的搜索引擎被廣泛應用于各種應用系統,比如電商、新聞類、咨詢類網站。在使用ElasticSearch開發應用的過程中,一個非常重要的過程是將數據導入到ElasticSearch索引中建立文檔。在一開始系統規模比較小時,我們可以使用logstash來同步索引。logstash的好處是開方量少,只要進行編寫簡單的索引模板和同步sql,就能快速搭建索引同步程序。但是隨著應用數據規模的變大,索引變化變得非常頻繁。logstash的缺點也隨著暴露,包括(1)不支持刪除,只能通過修改字段屬性軟刪除,隨著應用使用時間的增長,ElasticSearch中會留存大量的無用數據,拖慢搜索速度。(2)sql分頁效率低,sql查詢慢。logstash的分頁邏輯是先有一個大的子查詢,然后再從子查詢中分頁獲取數據,因此效率低下,當數據庫數據量大時,一個分頁查詢就需要幾百秒。同步幾千萬數據可能需要1天時間。因此我們決定放棄使用logstash,而改用使用canal來搭建基于CDC技術的ElasticSearch索引同步機制。

系統架構設計

如圖所示,索引同步系統由幾個部分組成,下面分點介紹。

(1)數據庫

原始數據數據庫

(2)Canal

Canal是阿里云開源的MySql數據庫增量數據訂閱和消費工具。它的實現原理是將自己偽裝為一個MySQL slave,向MySql master發送dump協議;MySQL master收到dump請求,開始推送binary log給slave,canal解析binary log對象。

(3)Canal Client

Canal Client是自己實現的程序,通過從Canal Server中獲取經過Canal解析之后的數據庫binlog日志,做相應的業務邏輯處理。在本文介紹的基于CDC的索引同步系統中,Canal Client訂閱搜索相關的數據庫表的binlog日志,如果跟數據搜索相關的數據發生變化時,就向Rabbit發一條消息,表明數據發生變化了,通知同步Worker從MySQL同步數據到ES。

(4)RabbitMQ

消息隊列,也可以選用Kafaka等其他消息隊列,根據具體業務確定。

(5)索引同步Worker

Worker從消息隊列中消費數據,根據消息從MySQL獲取相應的數據并同步到ElasticSearch中。

Canal Client實現

Canal Client從Canal Server中獲取binlog日志,并根據業務需求進行處理。以下通過一些關鍵代碼介紹Canal Client的實現。

(1)在pom中添加Canal client的依賴。

<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version></dependency>

(2)初始化Canal連接

CanalConfig包含了Canal的配置信息。CanalConnector為canal-client包中的類,我們通過這個類來連接server,獲取binlog,關閉server。該服務基于SpringBoot。因此init會在CanalClientInitializer bean被創建時被調用,preDestory會在服務關閉,CanClientInitializer被銷毀時被調用。

@Component @Slf4j public class CanalClientInitializer {CanalConfig canalConfig;CanalConnector connector;CanalDataProcessor canalDataProcessor;public CanalClientInitializer(@Autowired CanalConfig canalConfig, @Autowired CanalDataProcessor canalDataProcessor) {this.canalConfig = canalConfig;this.canalDataProcessor = canalDataProcessor;}@PostConstructpublic void init() throws InterruptedException {connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getIp(), canalConfig.getPort()), canalConfig.getDestination(), "", "");//建立連接connector.connect();//訂閱相關的表connector.subscribe(canalConfig.getSyncTable());canalDataProcessor.process(connector);}@PreDestroypublic void preDestroy() {log.info("stop the canal client");canalDataProcessor.stopProcess();}}

(3)CanalDataProcessor獲取并處理binlog

@Component @Slf4j public class CanalDataProcessor {boolean isRunning;RabbitTemplate rabbitTemplate;TableChangeProcessor tableChangeProcessor;public CanalDataProcessor(@Autowired RabbitTemplate rabbitTemplate, @Autowired TableChangeProcessor processor) {this.rabbitTemplate = rabbitTemplate;this.tableChangeProcessor = processor;}@Asyncpublic void process(CanalConnector connector) throws InterruptedException {isRunning = true;while (isRunning) {try {//獲取消息Message message = connector.getWithoutAck(100, 10L, TimeUnit.SECONDS);//業務處理邏輯processMessage(message);//消息被成功執行,向Canal Server發送ack消息通知server該message已經被處理完成connector.ack(message.getId());} catch (Exception e) {log.error("wtf", e);//當消息沒被成功處理完成時進行回滾,下次能夠重新獲取該Messageconnector.rollback();Thread.sleep(1000);}}connector.disconnect();}public void stopProcess() {isRunning = false;}private void processMessage(Message message) {for(Entry entry : message.getEntries()) {try {tableChangeProcessor.process(entry);} catch (Exception e) {log.error("wtf", e);continue;}}} }

(4)TableChangeProcessor

TableChangeProcessor中為具體的業務邏輯,處理Message,獲取跟搜索相關的數據變化,發送相應的消息到消息隊列中。

注意點

(1)忽略搜索無關的數據字段變化,避免不必要的索引更新,降低服務器壓力。如Products表中有一個product_weight表示商品重量發生了變化,但其實商品重量跟搜索無關,那就不要關心這個變化。

(2)對于搜索中不會出現的數據,不要寫入到ES中,比如電商商品中的下架商品,另外,如果商品被下架,則要進行監聽通知索引同步Worker從es中刪除索引文檔。這樣能夠降低ES中總的索引文檔數量,提升搜索效率。

(3)要考慮Rabbit掛掉或者隊列寫滿,消息無法寫入的情況;首先應該在Rabbit發送消息時添加重試,其次應該在重試幾次還是失敗的情況下拋出異常,canal消息流回滾,下次還是能夠獲取到這個數據變化的Canal消息,避免數據變動的丟失。

(4)注意目前Canal只支持單Client。如果要實現高可用,則需要依賴于ZooKeeper,一個Client作為工作Client,其余Client作為冷備,當工作Client掛掉時,冷備Client監聽到ZooKeeper數據變化,搶占鎖成為工作Client。

Canal Worker實現

索引同步Worker從消息隊列中獲取Canal Client發送的跟搜索相關的數據庫變化消息。舉個例子,比如商品表中跟搜索相關的字段發生了變化,Canal Client會發送以下一條數據:

{"change_id": "694212527059369984","change_type": 1, //商品發生變化"change_time": "1600741397" }

在Worker中監聽隊列消息:

@Component @Slf4j public class ProductChangeQueueListener {@Autowired@Qualifier("snake")ObjectMapper om;@AutowiredChangeEventHandlerFactory changeEventHandlerFactory;@RabbitListener(queues = RabbitConfig.PRODUCT_QUEUE_NAME, containerFactory = "customRabbitListenerContainerFactory")public void onChange(Message message) {ChangeEvent event = parse(message);if(event == null) {return;}changeEventHandlerFactory.handle(event);}private ChangeEvent parse(Message message) {ChangeEvent event = null;try {event = om.readValue(new String(message.getBody()), ChangeEvent.class);} catch (Exception e) {log.error("同步失敗,解析失敗", e);}return event;}}

ChangeEventHandlerFactory為事件處理器的工廠類。以下為一個事件處理器的實現。它監聽changeType為CHANGE_TYPE_OUT_PRODUCT的事件,從數據庫中獲取到變動的數據,構建ES的IndexRequest,并將Request存入到RequestBulkBuffer中,等待批量同步到ES中。有些同學可能會有疑問,為何不直接從Canal中獲取數據,主要原因是Canal中只包含了單表數據,但是索引文檔可能包含了多表的數據,因此還需要從MySQL獲取數據。如果索引文檔中只包含單表數據,可以考慮在ChangeEvent中包含修改之后的數據,索引同步Woker就不用再從MySql中再獲取一遍數據,提升Worker工作效率。

@Component @Slf4j public class OutProductEventHandler implements ChangeEventHandler {@AutowiredProductDao productDao;@AutowiredRequestBulkBuffer buffer;@AutowiredOutProductChangeRequestBuilder builder;@Override@Retryablepublic boolean handle(ChangeEvent changeEvent) {if (!match(changeEvent)) {return false;}Tuple dataTuple = productDao.getProductWithStore(changeEvent.getChangeId());if (dataTuple == null) {return true;}Product product = dataTuple.get(QProduct.product);Store store = dataTuple.get(QStore.store);IndexRequest request = null;try {request = builder.convertToUpdateQuery(getTimestampNow(), product, store);} catch (Exception e) {log.error("wtf", e);}if (request == null) {return true;}buffer.add(request);return true;}@Overridepublic boolean match(ChangeEvent changeEvent) {return ChangeEvent.CHANGE_TYPE_OUT_PRODUCT == changeEvent.getChangeType();} }

在上面的OutProductEventHandler類中,我們并不直接在該類中使用RestHighLevelClient將文檔更新到ES索引,而是將IndexRequest暫存到RequestBulkBuffer中。RestBulkBuffer使用CircularFifoBuffer作為存儲數據結構。

@Component public class RequestBulkBuffer {CircularFifoBuffer buffer;public RequestBulkBuffer(CircularFifoBuffer buffer) {this.buffer = buffer;}public void add(DocWriteRequest<?> request) {buffer.add(request);}}

CircularFifoBuffer是一個經過改造的環形隊列實現。允許多線程寫,在我們這個應用場景中只支持也只需支持單線程讀->處理->移除處理完的數據。當環形隊列緩存滿時,借助于semaphore,寫入線程將會被阻塞,在后面的Worker如何防止數據丟失中,我們來闡述為什么要這么做。

/*** 允許多線程寫* 只允許單線程->讀->處理->移除*/ public class CircularFifoBuffer {private Logger logger = LoggerFactory.getLogger(CircularFifoBuffer.class.getName());private transient Object[] elements;private transient int start = 0;private transient int end = 0;private transient boolean full = false;private final int maxElements;private ReentrantLock addLock;private Semaphore semaphore;public CircularFifoBuffer(int size) {if (size <= 0) {throw new IllegalArgumentException("The size must be greater than 0");}elements = new Object[size];maxElements = elements.length;addLock = new ReentrantLock();semaphore = new Semaphore(size);}public int size() {int size = 0;if (end < start) {size = maxElements - start + end;} else if (end == start) {size = (full ? maxElements : 0);} else {size = end - start;}return size;}public boolean isEmpty() {return size() == 0;}public boolean isFull() {return size() == maxElements;}public int maxSize() {return maxElements;}public void clear() {full = false;start = 0;end = 0;Arrays.fill(elements, null);}public boolean add(Object element) {if (null == element) {throw new NullPointerException("Attempted to add null object to buffer");}addLock.lock();try {semaphore.acquire();} catch (Exception e) {logger.error("RingBuffer", "線程退出,添加失敗");return false;}elements[end++] = element;if (end >= maxElements) {end = 0;}if (end == start) {full = true;}addLock.unlock();return true;}public Object get() {if (isEmpty()) {return null;}return elements[start];}public Object remove() {if (isEmpty()) {return null;}Object element = elements[start];if(null != element) {elements[start++] = null;if (start >= maxElements) {start = 0;}full = false;semaphore.release();}return element;}/*** @param size the max size of elements will return*/public Object[] get(int size) {int queueSize = size();if (queueSize == 0) { //emptyreturn new Object[0];}int realFetchSize = queueSize >= size ? size : queueSize;if (end > start) {return Arrays.copyOfRange(elements, start, start + realFetchSize);} else {if (maxElements - start >= realFetchSize) {return Arrays.copyOfRange(elements, start, start + realFetchSize);} else {return ArrayUtils.addAll(Arrays.copyOfRange(elements, start, maxElements),Arrays.copyOfRange(elements, 0, realFetchSize - (maxElements - start)));}}}public Object[] getAll() {return get(size());}public Object[] remove(int size) {if(isEmpty()) {return new Object[0];}int queueSize = size();int realFetchSize = queueSize >= size ? size : queueSize;Object [] retArr = new Object[realFetchSize];for(int i=0;i<realFetchSize;i++) {retArr[i] = remove();}return retArr;}}

下面這個類為緩存的消費者,它循環從buffer中獲取一定數據的數據,并使用RestHighLevelClient將數據批量同步到ES。在Worker啟動時,會創建一個線程調用startConsume,在服務關閉時該線程結束。

@Slf4j public class RequestBulkConsumer {private static final int DEFAULT_BULK_SIZE = 2000;private CircularFifoBuffer buffer;private EsBulkRequestService service;private boolean isRunning = false;private int bulkSize = DEFAULT_BULK_SIZE;public RequestBulkConsumer(CircularFifoBuffer buffer, RestHighLevelClient client) {this.buffer = buffer;this.service = new EsBulkRequestService(client);}public void setBulkSize(int size) {this.bulkSize = size;}public int getBulkSize() {return bulkSize;}public boolean isRunning() {return isRunning;}public void startConsume() {if(isRunning) {return;}isRunning = true;while(true) {if(!isRunning) {break;}Object [] items = buffer.get(bulkSize);if(items.length == 0) {try {Thread.sleep(1000);} catch (InterruptedException e) {break;}} else {List<DocWriteRequest<?>> requests = convert(items);try {BulkResponse response = service.request(requests);processResponse(response);buffer.remove(items.length);if (items.length < bulkSize) {Thread.sleep(3000);}} catch (InterruptedException e) {break;} catch (IOException e) {log.error("wtf", e);} catch (Exception e) {log.error("wtf", e);buffer.remove(items.length);}}}}private List<DocWriteRequest<?>> convert(Object [] items) {return Stream.of(items).map(i -> {if(i instanceof DocWriteRequest) {return (DocWriteRequest<?>) i;} else {return null;}}).filter(Objects::nonNull).collect(Collectors.toList());}public void stop() {isRunning = false;}private void processResponse(BulkResponse bulkResponse) {BulkItemResponse [] itemResponseArr = bulkResponse.getItems();for(BulkItemResponse resp : itemResponseArr) {DocWriteResponse docWriteResponse = resp.getResponse();if(docWriteResponse instanceof IndexResponse) {IndexResponse indexResponse = (IndexResponse) docWriteResponse;if(indexResponse.getResult() != Result.CREATED && indexResponse.getResult() != Result.UPDATED) {if(indexResponse.status() == RestStatus.CONFLICT) {continue;} else {log.error("索引更新失敗: {}, {}", indexResponse.getId(), resp.getFailureMessage());}}} else if(docWriteResponse instanceof DeleteResponse) {DeleteResponse deleteResponse = (DeleteResponse) docWriteResponse;if(deleteResponse.getResult() != Result.DELETED) {log.error("索引刪除失敗: {}, {}", deleteResponse.getId(), resp.getFailureMessage());}}}} }

以下為Worker的主要幾個類的代碼。在索引同步系統中,高可用并不是最重要的,因為我們的搜索本身是一個準實時系統,只需要保證最終一致性就可以了,我們主要需要避免的是數據變更的丟失。以下說明在Worker中是如何避免數據丟失的。

避免數據丟失

(1)如果Rabbit掛掉,沒關系,Canal Client那邊在Rabbit掛掉期間無法消費binlog,會等待Rabbit重啟之后再處理數據變化。Worker只要能做到Rabbit重啟之后重連就行。

(2)如果MySQL掛掉,則Worker無法從數據庫中獲取數據,則消息處理失敗,消息會堆積在Rabbit中。等MySQL重新上線之后,消息重新開始處理,數據也不會丟失。

(3)如果ES掛掉,則批量處理線程消費buffer中的數據時會失敗,buffer會被生產者填滿,由于CircularFifoBuffer在被填滿時使用了信號量阻塞生產者線程,消息又會被堆積在Rabbit中,等待ES重新上線之后,消息重新開始處理,數據也不會丟失。

(4)如果Rabbit隊列被寫滿,emmm,設置好在內存被占滿時將消息寫入硬盤然后搞一個大一點的硬盤吧,Rabbit默認應該就是這么做的。然后做好預警,當消息達到一定量時抓緊處理,一般來說可能性不是很大。

(5)版本沖突,如果商品表中某一條數據如商品A在同一秒內變化了兩次,消息隊列中有連續兩條消息,又由于這兩條消息可能在兩個線程中被消費,由于網絡,計算機性能等原因,先變的數據后被寫入ES中,導致ES中數據和MySql數據不一致。因此我們在更新索引時使用ES的外部版本號。使用從MySQL中取數據時的時間戳作為版本號,只有當時間戳比當前版本號大或相等時才能變更文檔,否則ES會報版本沖突錯誤。

private IndexRequest convertToUpdateQuery(Long timestamp, OutStoreProduct outStoreProduct) throws JsonProcessingException {IndexRequest indexRequest = new IndexRequest(indexName, "doc", outStoreProduct.getId());if(StringUtils.isEmpty(outStoreProduct.getTooEbaoProductId())) {log.error("商品 {} 的ebaoProductId為空,無法同步", outStoreProduct.getId());return null;}indexRequest.source(om.writeValueAsString(outStoreProduct), XContentType.JSON).versionType(VersionType.EXTERNAL_GTE).version(timestamp).routing(outStoreProduct.getTooEbaoProductId());return indexRequest;}

關于全量同步

以上只是實現了增量同步,在索引初始化時,我們需要做全量同步操作,將數據從數據庫初始化到ES索引中。我們可以在Worker中寫一個接口,該接口實現邏輯分批將數據同步任務發到消息隊列中,其它worker收到消息后完成對應任務。比如我們可以發布每一個門店的數據同步任務,worker每收到一個消息,同步一個門店的數據。

總結

綜上,本系統是一個近實時的能夠保證ES和MySQL數據一致性的高效索引同步系統。

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的sql server cdc 清理_基于CDC技术的ElasticSearch索引同步机制的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。