基于redis分布式锁实现的多线程并发程序(原创)
第一個版本問題:? 局限于單機版,依賴于 Jvm的鎖
第二個版本問題:? 極端情況下,解鎖邏輯的問題,線程B的鎖,可能會被線程A解掉,這種情況實際上是不合理的。
1.由于是客戶端自己生成過期時間,所以需要強制需要分布式下每個客戶端的時間必須同步。
2.當鎖過期的時候,如果多個客戶端同時執行jedis.getSet()方法,那么雖然最終只有一個客戶端可以加鎖,但是這個客戶端的
鎖的過期時間可能被其他客戶端覆蓋。
3.鎖不具備擁有者標識,即任何客戶端都可以解鎖。
版本一:?http://www.cnblogs.com/xifenglou/p/8807323.html
版本二:?http://www.cnblogs.com/xifenglou/p/8883717.html
所以基于以上問題,第三個版本出來了,Talk is cheap, show me the code!
import org.springframework.util.StopWatch; import redis.clients.jedis.Jedis;import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executors;/*** 使用RedisTool.tryGetDistributedLock* 實現分布式鎖* 終極版本*/ public class TicketRunnable3 implements Runnable {private CountDownLatch count;private CyclicBarrier barrier;private static final Integer Lock_Timeout = 10000;private static final String lockKey = "LockKey";private volatile static boolean working = true;public TicketRunnable3(CountDownLatch count, CyclicBarrier barrier) {this.count = count;this.barrier = barrier;}private int num = 20; //總票數 此處可隨意寫一個數,保證線程能運行起來,真正的共享變量不應該寫死在程序中,應該從redis中獲取,這樣模擬多進程多線程的并發訪問public void sellTicket(Jedis jedis) {String name = Thread.currentThread().getName();try {boolean getLock = RedisTool.tryGetDistributedLock(jedis,lockKey, name, Lock_Timeout);if (getLock) {if (!working) return;//Do your jobnum = Integer.parseInt(jedis.get("ticket"));if (num > 0) {num--;jedis.set("ticket", num+"");if (num != 0)System.out.println("==============="+Thread.currentThread().getName()+"============= 售出票號" + (num+1)+", 還剩" + num + "張票--");else {System.out.println("================"+Thread.currentThread().getName()+"================= 售出票號" + (num+1)+",票已經票完!--");working = false;}}} else {//System.out.println();if (!working) return;System.out.println(Thread.currentThread().getName()+" Try to get the Lock, and wait 20 millisecond...");Thread.sleep(10);}} catch (Exception e) {System.out.println(e);} finally {try {if (RedisTool.releaseDistributedLock(jedis, lockKey, name)) {Thread.sleep(30);}} catch(Exception e) {e.printStackTrace();}}} }@Override public void run() {System.out.println(Thread.currentThread().getName()+"到達,等待中...");Jedis jedis = new Jedis("localhost", 6379);try {barrier.await(); //此處阻塞 等所有線程都到位后 一起進行搶票if (Thread.currentThread().getName().equals("pool-1-thread-1")) {System.out.println("----------------全部線程準備就緒,開始搶票-----------");} else {Thread.sleep(5);}while (working) {sellTicket(jedis);}count.countDown(); //當前線程結束后,計數器-1} catch (Exception e) {e.printStackTrace(); } }/*** * @param args*/ public static void main(String[] args) {int threadNum = 5; //模擬多個窗口 進行售票final CyclicBarrier barrier = new CyclicBarrier(threadNum);final CountDownLatch count = new CountDownLatch(threadNum); //用于統計 執行時長StopWatch watch = new StopWatch();watch.start();TicketRunnable3 tickets = new TicketRunnable3(count, barrier);ExecutorService executorService = Executors.newFixedThreadPool(threadNum);//ExecutorService executorService = Executors.newCachedThreadPool();for (int i=0; i<threadNum; i++) { //此處 設置數值 受限于 線程池中的數量executorService.submit(tickets);}try {count.await();executorService.shutdown();watch.stop();System.out.println("耗時:" + watch.getTotalTimeSeconds() + "秒");} catch (InterruptedException e) {e.printStackTrace();} } }import redis.clients.jedis.Jedis; import java.util.Collections;public class RedisTool private static final String LOCK_SUCCESS = "OK";private static final String SET_IF_NOT_EXIST = "NX";private static final String SET_WITH_EXPIRE_TIME = "PX";private static final Long RELEASE_SUCCESS = 1L;/*** 嘗試獲取分布式鎖* @param jedis Redis客戶端* @param lockKey 鎖* @param requestId 請求標識* @param expireTime 超期時間* @return 是否獲取成功*/public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);if (LOCK_SUCCESS.equals(result)) {System.out.println("=============="+Thread.currentThread().getName()+"===========獲取到鎖,開始工作!");return true;}return false;}/*** 釋放分布式鎖* @param jedis Redis客戶端* @param lockKey 鎖* @param requestId 請求標識* @return 是否釋放成功*/public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS1[1]) else return 0 end";Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));if (RELEASE_SUCCESS.equals(result)) {System.out.println("==========="+Thread.currentThread().getName()+"=========== 解鎖成功!");return true;}return false;} }解鎖部分,我們將Lua代碼傳到jedis.eval()方法里,并使參數KEYS[1]賦值為lockKey,ARGV[1]賦值為requestId。eval()方法是將Lua代碼交給Redis服務端執行。
那么這段Lua代碼的功能是什么呢?其實很簡單,首先獲取對應的value值,檢查是否與requestId相等,如果相等則刪除鎖(解鎖)。那么為什么要使用Lua語言來實現呢?因為要確保上述操作時原子性的。源于Redis的特性,下面是官網對eval命令的部分解釋:
簡單來說,就是在eval命令執行Lua代碼的時候,Lua代碼將被當成一個命令去執行,并且直到eval命令執行完成,Redis才會執行其他命令。
運行結果如下:
針對?上述代碼,使用兩個類?運行,
TicketRunnable3? TicketRunnable4?模擬多進程? 多線程場景 ,
場景1:?運行時長 >?過期時長? ??
此時:?鎖自動失效,?線程均不用解鎖,即使解鎖也是失敗!
?
代碼及運行結果如下:
import org.springframework.util.StopWatch; import redis.clients.jedis.Jedis;import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;/*** 使用RedisTool.tryGetDistributedLock* 實現 分布式鎖* 終極版本*/ public class TicketRunnable4 implements Runnable {private CountDownLatch count;private CyclicBarrier barrier;private static final Integer Lock_Timeout = 3000; //過期時間 代表3秒后過期private static final Integer ExecuteTime = 5000; private static final Integer RetryInterval = 20;private static final String lockKey = "LockKey";private valatile static boolean working = true;public TicketRunnable4(CountDownLatch count, CyclicBarrier barrier) {this.count = count;this.barrier = barrier;}private int num = 20; //總票數public void sellTicket(Jedis jedis) {String name = Thread.currentThread().getName();boolean gotLock = false;try {gotLock = RedisTool.tryGetDistributedLock(jedis, lockKey, name, Lock_Timeout);if (gotLock && working) {// Do your jobnum = Integer.parseInt(jedis.get("ticket"));if (num > 0) {num--;jedis.set("ticket", num+"");if (num != 0) System.out.println("============"+name+"=================== 售出票號" + (num+1)+",還剩" + num + "張票--");else {System.out.println("============"+name+"=================== 售出票號" + (num+1)+", 票已經售完!--");return;}}if (num == 0) {System.out.println("==============="+name+"==================票已經被搶空啦");working = false;}Thread.sleep(ExecuteTime);} else {//System.out.println();//System.out.println(name+" Try to get the Lock, and wait " + RetryInterval + " millisecond...");Thread.sleep(RetryInterval);}} catch(Exception e) {System.out.println(e);} finally{try {if (!gotLock || !working) //未獲取到鎖的線程不用解鎖return;/*** 解鎖成功后sleep,嘗試讓出cpu給其他線程機會* 解鎖失敗 說明鎖已經失效 被其他線程獲取到*/if (RedisTool.releaseDistributedLock(jedis, lockKey, name)) {Thread.sleep(100);}} catch (Exception e) {e.printStackTrace();} }}@Overridepublic void run() {String prefix = "#";String threadName = Thread.currentThread().getName();Thread.currentThread().setName(prefix+threadName);System.out.println(Thread.currentThread().getName() +"到達,等待中...");Jedis jedis = new Jedis("localhost", 6379);try {barrier.await();if (Thread.currentThread().getName().equals(prefix+"pool-1-thread-2")) {System.out.println("-------------------全部線程準備就緒,開始搶票------");} else {Thread.sleep(5);}while(working) {sellTicket(jedis);}count.countDown(); //當前線程結束后,計數器-1} catch (Exception e) {e.printStackTrace();}}/*** * @param args*/public static void main(String[] args) {int threadNum = 3; //模擬多個窗口 進行售票final CyclicBarrier = new CyclicBarrier(threadNum);final CountDownLatch count = new CountDownLatch(threadNum); //用于統計 執行時長StopWatch watch = new StopWatch();watch.start();TicketRunnable4 tickets = new TicketRunnable4(count, barrier);ExecutorService executorService = Executors.newFixedThreadPool(threadNum);//ExecutorService executorService = Executors.newCachedThreadPool();for (int i=0; i<threadNum; i++) { //此處設置數值 受限于 線程池中的數量executorService.submit(tickets);}try {count.await();executorService.shutdown();watch.stop();System.out.println("耗時:" + watch.getTotalTimeSeconds() + "秒");} catch (InterruptedException e) {e.printStackTrace();}} }TicketRunnable3 售出 10 9 8 6 5 3 2 1 票號。
TicketRunnable4售出 7 4 兩個票號
合計10張票,模擬結束!
場景2:?運行時長 < 過期時長? ??
此時:?此時需要有鎖線程去釋放鎖,這樣多線程再去競爭獲取鎖。
修改代碼:
private static final Integer Lock_Timeout = 5000; //將時間從3秒改為5秒
private static final Integer ExecuteTime = 3000;? //將執行時間5秒改為3秒
運行結果如下:
一個進程售出 10 9 8 6 4 3 1 票號
另一個進程售出 7 5 2 票號
此時 每個線程完成任務后,均需要釋放鎖,這樣本地線程或是異地線程 才能獲取到鎖,這樣才能有機會進行任務的執行!
?
?
總結
以上是生活随笔為你收集整理的基于redis分布式锁实现的多线程并发程序(原创)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: nginx开启core dump文件
- 下一篇: 如何做可靠的分布式锁,Redlock真的