Redis03-优惠券秒杀
一、分布式Id:訂單id
在分布式架構下,傳統生成Id的方式已經不再適用,應該生成全局唯一的
分布式Id需要滿足的五個特性:全局性、唯一性、安全性、可用性、高性能
@Component public class RedisIdWorker {@Autowiredprivate StringRedisTemplate stringRedisTemplate;private final long COUNT_BITS = 32L;private final long BEGIN = 1652197856L;public long nextId(String keyPrefix){//1.獲取當前時間戳與項目運行時間戳的差值long timeFiled = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC) - BEGIN;//2.生成序列化號String date = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);return timeFiled << COUNT_BITS | count;} }每個業務每天都重新開始計數,求出當前時間與項目運行時間戳的差值,將其左移32位在對count進行 | 操作后轉化為十進制
該方法的思路:一個long類型有8個字節,用四個字節(32位)存放時間信息(136.102208 年),四個字節存放這一天的訂單數量。那么這種方法從項目開始計時,能夠正確運行2^32 秒,每天最多能生成2^32個id。
其實我們也可以只用16位來存儲當前count,剩下48位來存放時間信息,如果一天內達不到2^32次方的訂單id生成量,不如節省下來位數存儲時間信息讓項目運行的更久。
二、優惠券下單
我們先來看如下的流程:
這個流程咋一看好像沒有問題,但實際上它涉及到一個并發問題:
庫存是一個公共資源,線程對它就行讀-寫操作時如果不加鎖則會出現超賣問題
這個問題如何解決呢?當然是通過加鎖來解決,鎖大致分為樂觀鎖和悲觀鎖,我們來探討兩者的差異
樂觀鎖
樂觀鎖的思想也適用于分布式項目
對于優惠券秒殺這種寫多讀少的業務,推薦使用悲觀鎖,樂觀鎖成功率低反而更加消耗性能
一人一單
業務流程
并發問題
使用jdk提供的鎖只在它所在的jvm有效,無法鎖住其它jvm,由于該項目是一個分布式項目,我們需要采取分布式鎖解決該問題
分布式鎖
實現方式
MySQL實現方式:通過行鎖或表鎖的機制達到互斥性,但是它把鎖的壓力給到了數據庫,而數據庫又是相當脆弱的部分無法面對高并發場景,所以使用MySQL實現互斥鎖的業務并發量不能太大。具體看這篇博客:https://cloud.tencent.com/developer/article/1580632
Redis實現分布式鎖
上面這一套流程存在一個問題,它是由鎖標識一致所導致的,當線程一因為業務超時導致鎖過期時,線程二獲取到了鎖,等到線程一執行完后釋放掉的鎖此時已經是線程二的了,線程三看鎖沒了又去獲取鎖…
所以設置一個鎖標識是非常重要的,而并發是以線程為單位的,每個線程都有一個自己的鎖標識就ok啦,那拿什么給每個線程充當鎖標識呢?UUID可以嘛?理論上可以,但如果使用uuid的話,每上一次鎖就要生成一個uuid,是非常消耗性能的操作,有沒有更優化的操作呢?
其實每個線程都有一個天然的鎖標識,那就是線程id,但是不同jvm的線程id可能重復,我們可以將uuid設置為靜態變量作為鎖標識的前綴,用線程id作為鎖標識的后綴,這樣不同機器的uuid是不同的,可以保證一致性,同時uuid作為靜態變量每臺機器只需要創建一次,不需要每次上鎖都創建,提高了性能
Redis實現簡單的分布式鎖
public class SimpleRedisLock {private String key;private StringRedisTemplate stringRedisTemplate;private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}public SimpleRedisLock(String key, StringRedisTemplate stringRedisTemplate) {this.key = key;this.stringRedisTemplate = stringRedisTemplate;}public boolean tryLock(long timeSeconds){String id = ID_PREFIX + Thread.currentThread().getId();Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, id, timeSeconds, TimeUnit.SECONDS);return BooleanUtil.isTrue(flag);}public void unlock(){String val = stringRedisTemplate.opsForValue().get(key);if(val != null && val.equals(ID_PREFIX + Thread.currentThread().getId())){stringRedisTemplate.delete(key);}} }其實上述代碼還存在一點小問題,那就是unlock操作,由于它不是原子性操作,那么當它從redis的鎖中取值后,redis鎖過期了,那么雖然它可以執行if內的語句,但此時它的鎖已經過期了,它有可能會刪除其它線程的鎖
解決辦法也很簡單,就是想法讓unlock中的操作變成原子操作,一說這個很多人第一反應是加鎖,但jvm和redis是兩個獨立運行的進程,沒法加鎖,我們可以通過讓redis執行lua腳本來保證操作的原子性
unlock.lua
setnx實現分布式鎖存在的問題
Redission提供的分布式鎖
自動續約
我們回憶一下,之前使用setnx充當分布式鎖的時候為什么要設置過期時間呢?是防止由于redis宕機導致這個鎖無法釋放成為死鎖,那么當cpu資源緊張且某個業務執行時間比較長的時候,很可能鎖就被自動釋放導致其它線程也進入到該臨界區,存在并發安全問題,那么Redission的出現完美的解決了該問題!
Redission內置了WatchDog機制,默認每隔十秒檢查一遍鎖,若存在給它重新設置30秒過期時間,如果redis宕機了,那么WatchDog會停止續約操作,鎖會在30秒后自動釋放
WatchDog機制只有在未設置過期時間時才有效,也就是未設置leaseTime才生效
可重入
可重入機制是利用hashmap的特性,使用field標識鎖標識,val標識鎖的重入次數,釋放鎖時將val-1,若val=0,則刪除key
主從一致性
當我們的項目使用了redis集群時,當我們對主節點執行setnx進行上鎖的時候,從節點還未來得及同步主節點數據時,主節點宕機導致鎖丟失,我們可以采用redission提供的multilock的機制來解決該問題
鎖丟失
-
那么 Redisson 是如何解決上述問題的呢?既然導致主從一致性問題發生的主要原因是主從同步延時問題,Redisson 干脆直接舍棄了主從節點,所有 Redis 節點都是獨立的節點,相互之間無任何關系,都可以做讀寫操作。此時,我們想獲取鎖就必須依次向多個 Redis 都去獲取鎖(之前直接向 Master 節點獲取就可以),多個 Redis 節點都保存鎖的標識,才算獲取成功
-
這樣一來,由于所有節點都是獨立的,所以避免了主從一致性問題;又由于所有的節點都保存了鎖標識,即使由一個節點宕機,其他的節點也保存有鎖的標識,保證了高可用,并且可用性會隨著節點的增多而增高
-
此外,我們還以為給這些獨立的節點再加上從節點 Slave,即使一個獨立節點宕機了導致其對應的從節點變成新的主節點,且節點上鎖標識丟失了也沒有關系,因為我們只有在每一個節點都拿到鎖才算成功, 盡管可以在這個空虛的節點上獲取到鎖,但在其他節點上是獲取不到的,最終仍然是失敗,因此只要有任意一個節點存貨,其他線程就不可能拿到鎖,就不會出現鎖失效問題。這樣,既保留了主從同步機制,又確保了 Redis 集群的高可用特性,同時還避免了主從一致所引發的鎖失效問題,這個方案就叫做 mutilLock
簡而言之,就是對多個redis節點進行上鎖,必須全部上鎖成功才算成功,哪怕有一個節點的鎖沒有釋放當前線程都無法獲得鎖
秒殺優化
優化前
單線程處理數據的校驗以及訂單的創建,而從判斷秒殺庫存->訂單創建這個過程中我們需要采用加鎖的機制來保證不發生超賣問題和一人一單的正確性,這樣做雖然可行,但卻無法面對高并發場景,因為加鎖的過程太長,并且加鎖范圍內有好幾個隊數據庫的操作,業務太重,影響用戶體驗
優化后
將數據校驗的業務用redis處理,處理成功后返回給客戶端,而對數據庫的操作額外使用消費者線程在后臺中處理,既能保證數據一致性,又大大加快響應速度。
數據校驗
接口定義:
@PostMapping(“seckill/{id}”)
public Result seckillVoucher(@PathVariable(“id”) Long voucherId)
業務流程
注意:上述流程涉及到并發安全問題,在看下面代碼前可以思考一下加鎖的位置、對誰上鎖以及鎖的釋放
代碼如下
public Result secKillVoucher2(long id) {//1.設置keyString stock_key = RedisConstants.SECKILL_STOCK_KEY + id;String order_key = "seckill:order:" + id;long order_id = idWorker.nextId("order");//1.1判斷redis中是否有存放stock的keyif(StrUtil.isBlank(stringRedisTemplate.opsForValue().get(stock_key))) {//1.2 若不存在該key則刷新緩存Integer stock = cacheClient.queryDataByMutex(RedisConstants.SECKILL_STOCK_KEY, id, Integer.class, (v_id) -> {SeckillVoucher voucher = seckillVoucherService.getById(v_id);return voucher.getStock();}, 10, TimeUnit.MINUTES);}//2.判斷該用戶id是否存在set中Long userId = UserHolder.getUser().getId();RLock lock = redissonClient.getLock("lock:user:" + userId);boolean isLock = lock.tryLock();if(!isLock){return Result.fail("請無重復下單");}try {Boolean flag = stringRedisTemplate.opsForSet().isMember(order_key, userId.toString());if (flag) {return Result.fail("請無重復下單");}long add = stringRedisTemplate.opsForSet().add(order_key, userId.toString());//3.校驗庫存是否夠Integer stock = Integer.valueOf(stringRedisTemplate.opsForValue().get(stock_key));if (stock > 0) {stringRedisTemplate.opsForValue().decrement(stock_key);} else {return Result.fail("庫存不足");}//4.封裝好訂單信息發送到消息隊列HashMap<String, String> map = new HashMap<>();map.put("id", String.valueOf(order_id));map.put("userId", String.valueOf(userId));map.put("voucherId", String.valueOf(id));stringRedisTemplate.opsForStream().add("stream.orders", map);return Result.ok(order_id);}catch (Exception e){log.error(e.getMessage());return Result.fail(e.getMessage());}finally {System.out.println("seckill unlock");if(lock.isLocked() && lock.isHeldByCurrentThread()) {lock.unlock();}}}上述代碼大家覺得有問題嗎,我對它做過壓測,同一用戶的請求一秒1000的并發量是沒有問題的,而不同用戶訪問時,會出現超賣問題,因為上鎖的范圍是針對UserId的,不同用戶之間互不干擾,我們可以通過更改鎖的對象,使用voucherId作為鎖,但這樣會大大降低并發量,接下來我將介紹一種更好的解決方案:lua腳本
lua腳本
Lua也算一門古老的語言了,玩魔獸世界的玩家應該對它不陌生,WOW的插件就是用Lua腳本編寫的。在高并發的網絡游戲中Lua大放異彩被廣泛使用。
Lua廣泛作為其它語言的嵌入腳本,尤其是C/C++,語法簡單,小巧,源碼一共才200多K,這可能也是Redis官方選擇它的原因。
為什么使用lua解決并發問題:因為在lua中的操作是原子性的,redis一旦執行某個lua腳本,在執行完成之前是不會執行其它請求的
那么我們可以將上述代碼的鎖范圍內的業務邏輯寫在lua腳本中,讓lua腳本來保證它們的串行執行
代碼
@Overridepublic Result secKillVoucher(long id) {//1.設置keyString stock_key = RedisConstants.SECKILL_STOCK_KEY + id;String order_key = "seckill:order:" + id;long order_id = idWorker.nextId("order");//1.1判斷redis中是否有存放stock的keyif(StrUtil.isBlank(stringRedisTemplate.opsForValue().get(stock_key))) {//1.2 若不存在該key則刷新緩存Integer stock = cacheClient.queryDataByMutex(RedisConstants.SECKILL_STOCK_KEY, id, Integer.class, (v_id) -> {SeckillVoucher voucher = seckillVoucherService.getById(v_id);return voucher.getStock();}, 10, TimeUnit.MINUTES);}//2.執行lua腳本,判斷是否符合條件,若符合條件則發送到消息隊列Long res = stringRedisTemplate.execute(SECKILL_SCRIPT, Arrays.asList(order_key, stock_key), UserHolder.getUser().getId().toString(),String.valueOf(order_id),String.valueOf(id));//3. 條件校驗失敗if(res.intValue() != 0){return Result.fail("下單失敗,請勿重復下單");}return Result.ok(order_id);}lua腳本
--獲取key local order_key = KEYS[1] local order_stock_key = KEYS[2]--獲取用id local user_id = ARGV[1] -- 獲取訂單id local order_id = ARGV[2] -- 獲取優惠券id local voucher_id = ARGV[3]-- 判斷庫存是否足夠 if(tonumber(redis.call("get",order_stock_key)) <= 0) thenreturn 1 end-- 判斷用戶id是否存在該商品的用戶列表中 if( redis.call("sismember",order_key,user_id) == 1) thenreturn 2 end-- 庫存-1 redis.call("incrby",order_stock_key,-1) -- 將userId添加進該商品的用戶列表 redis.call("sadd",order_key,user_id) -- 向消息隊列發送消息 redis.call("xadd","stream.orders","*","id",order_id,"userId",user_id,"voucherId",voucher_id) return 0消息隊列:Stream
在redis中,有個Stream類型的數據,可以說是為了消息隊列而生的,若是項目不太大但又需要使用消息隊列,我們可以使用redis的Stream類型來充當消息隊列,它的優點是配置簡單,不會額外增加運維成本、使用方便
基本的使用語法我已經寫在了另一篇博客中:https://blog.csdn.net/qq_42861526/article/details/124753721
接下來我主要給大家講一下Stream的特點
消費者組
Stream和消費者組通常是一起出現的,我們可以為Stream創建一個或多個消費者組,每個消費者組包含一個或多個消費者,消費者組之間共享消息,同一個消費者組下的消費者競爭消息
特點
- 消息分流:隊列中的消息會分流給消費者組中不同的消費者,不會讓他們重復消費,提高消息處理速度
- 消息標示:每個消費者組會維護一個標示,記錄它最后處理過的消息,哪怕它宕機后重啟,也能從標示之后開始消費
- 消息確認:消費者獲取消費后,消息會變成pending狀態并添加到pending-list中,當消費者對該消息執行XACK后,該消息才會重pending-list中移除
讀取、解析消息
private class VoucherHandler implements Runnable{@Overridepublic void run() {try {while(true) {//1.從消息隊列中取出訂單List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.orders", ReadOffset.lastConsumed()));//2.判斷消息是否為空if(list == null || list.isEmpty()){continue;}//3.解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> map = record.getValue();VoucherOrder order = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);//4.創建訂單createVoucherOrder(order);stringRedisTemplate.opsForStream().acknowledge("stream.orders","g1",record.getId());}} catch (Exception e) {log.error("處理訂單異常");log.error(e.getMessage());handlePendingMsg();}}}private void handlePendingMsg(){try {while(true) {//1.從消息隊列中取出訂單List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.orders", ReadOffset.from("0")));//2.判斷消息是否為空if(list == null || list.isEmpty()){break;}//3.解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> map = record.getValue();VoucherOrder order = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);//4.創建訂單createVoucherOrder(order);stringRedisTemplate.opsForStream().acknowledge("stream.orders","g1",record.getId());}} catch (Exception e) {try {Thread.sleep(1000);} catch (InterruptedException interruptedException) {interruptedException.printStackTrace();}log.error("處理訂單異常");}}消費消息
@Transactionalpublic void createVoucherOrder(VoucherOrder order) {log.debug("創建訂單......");//1.userid設置鎖RLock lock = redissonClient.getLock("lock:order:user:" + order.getUserId());boolean tryLock = lock.tryLock();if(!tryLock){log.debug("請勿重復下單");return ;}try{//2. 查看該用戶是否搶過該優惠券Integer count = query().eq("user_id", order.getUserId()).eq("voucher_id", order.getVoucherId()).count();if (count > 0) {log.debug("請勿重復下單");return ;}//3.扣減庫存System.out.println("扣減庫存");boolean success = seckillVoucherService.update().setSql("stock = stock-1").eq("voucher_id", order.getVoucherId()).gt("stock", 0).update();//3.1庫存不足if(!success){log.debug("庫存不足");return;}//4.保存訂單到數據庫save(order);}catch (Exception e){e.printStackTrace();}finally {lock.unlock();}}問題:我們已經在發送消息時保證了并發安全,為什么在處理消息時還采用加鎖和數據校驗?
答:因為在redis主從同步的集群下,我們判斷我們的用戶id是否存在voucherId對應set中的操作是一個讀操作,它會去從節點讀取,若主節點已經添加了這個userId,而從節點還沒來得及同步消息,那么代碼會繼續往下執行,將同樣的消息發送到消息隊列中
但在我們這個項目中,由于使用lua腳本來保證執行的原子性,即使在主從集群下,lua腳本首先會發給主節點,主節點再將腳本分發給從節點一起執行,所以主從的所有節點一次只能執行一個lua腳本請求,不會出現上面所說的情況,我們添加鎖和數據校驗只是為了增強程序的健壯性,因為執行消費消息的線程是后臺執行的,它并不要求響應速度,所以額外增加一點業務也無傷大雅
總結
以上是生活随笔為你收集整理的Redis03-优惠券秒杀的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MySQL-day02作业
- 下一篇: Redisson(4)分布式锁之RedL