监听Canal
Canal提供了各種語言的客戶端,當Canal監聽到binlog變化時,會通知Canal的客戶端
?
我們可以利用Canal提供的Java客戶端,監聽Canal通知消息。當收到變化的消息時,完成對緩存的更新。
不過這里我們會使用GitHub上的第三方開源的canal-starter客戶端。地址:GitHub - NormanGyllenhaal/canal-client: spring boot canal starter 易用的canal 客戶端 canal client
與SpringBoot完美整合,自動裝配,比官方客戶端要簡單好用很多。
引入依賴:
<dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.1-RELEASE</version> </dependency>編寫配置:
canal:destination: leon # canal的集群名字,要與安裝canal時設置的名稱一致server: 192.168.150.101:11111 # canal服務地址修改Item實體類
通過@Id、@Column、等注解完成Item與數據庫表字段的映射:
import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data; import org.springframework.data.annotation.Id; import org.springframework.data.annotation.Transient;import javax.persistence.Column; import java.util.Date;@Data @TableName("tb_item") public class Item {@TableId(type = IdType.AUTO)@Idprivate Long id;//商品id@Column(name = "name")private String name;//商品名稱private String title;//商品標題private Long price;//價格(分)private String image;//商品圖片private String category;//分類名稱private String brand;//品牌名稱private String spec;//規格private Integer status;//商品狀態 1-正常,2-下架private Date createTime;//創建時間private Date updateTime;//更新時間@TableField(exist = false)@Transientprivate Integer stock;@TableField(exist = false)@Transientprivate Integer sold; }編寫監聽器
通過實現EntryHandler<T>接口編寫監聽器,監聽Canal消息。注意兩點:
-
實現類通過@CanalTable("tb_item")指定監聽的表信息
-
EntryHandler的泛型是與表對應的實體類
在這里對Redis的操作都封裝到了RedisHandler這個對象中,是我們之前做緩存預熱時編寫的一個類,內容如下:
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component;import java.util.List;@Component public class RedisHandler implements InitializingBean {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate IItemService itemService;@Autowiredprivate IItemStockService stockService;private static final ObjectMapper MAPPER = new ObjectMapper();@Overridepublic void afterPropertiesSet() throws Exception {// 初始化緩存// 1.查詢商品信息List<Item> itemList = itemService.list();// 2.放入緩存for (Item item : itemList) {// 2.1.item序列化為JSONString json = MAPPER.writeValueAsString(item);// 2.2.存入redisredisTemplate.opsForValue().set("item:id:" + item.getId(), json);}// 3.查詢商品庫存信息List<ItemStock> stockList = stockService.list();// 4.放入緩存for (ItemStock stock : stockList) {// 2.1.item序列化為JSONString json = MAPPER.writeValueAsString(stock);// 2.2.存入redisredisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);}}public void saveItem(Item item) {try {String json = MAPPER.writeValueAsString(item);redisTemplate.opsForValue().set("item:id:" + item.getId(), json);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}public void deleteItemById(Long id) {redisTemplate.delete("item:id:" + id);} } 超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生總結
- 上一篇: Canal数据同步策略
- 下一篇: 服务异步通信