java 阻塞锁_Java实现锁、公平锁、读写锁、信号量、阻塞队列、线程池等常用并发工具...
鎖的實現
鎖的實現其實很簡單,主要使用Java中synchronized關鍵字。
public class Lock {
private volatile boolean isLocked = false;
private Thread lockingThread = null;
public synchronized void lock() throws InterruptedExpection {
while(isLocked){
wait();
}
isLocked = true;
lockingThread = Thread.currentThread();
}
public synchronized void unlock() {
if(this.lockingThread != Thread.currentThread()){
throw new IllegalMonitorStateException("Calling thread has not locked this lock");
}
isLocked = false;
lockingThread = null;
notify();
}
}
公平鎖的實現
上面的鎖的實現嚴格意義上說是會存在線程饑餓現象的(也就是說在多線程競爭的條件下,存在一種極端情況,即某個線程一直阻塞在鎖上,永遠都是其他線程被優先喚醒,導致自己得不到執行)。下面是公平鎖的實現:
/**
* @Author: Jeysin
* @Date: 2019/4/16 12:16
* @Desc: 公平鎖的實現,不會存在線程餓死現象。
* 實現原理:每個線程在不同的對象上調用wait方法,Lock類可以決定調用哪個對象的notify方法,所以可以做到喚醒特定的線程
*/
public class FairLock {
private volatile boolean isLocked = false;
private Thread lockingThread = null;
private List waitingThreads = new ArrayList();
public void lock() throws InterruptedException{
QueueObject queueObject = new QueueObject();//首先給每個要加鎖的線程new一個QueueObject對象
boolean isLockedForThisThread = true;
synchronized (this){
waitingThreads.add(queueObject);//將這個對象添加到鏈表里,注意用synchronize關鍵字做并發控制
}
while(isLockedForThisThread){
synchronized (this) {
//判斷一下當前鎖是否沒有被占用,并且判斷當前線程對應的QueueObject是否是鏈表中的第一個(因為默認鏈表中第一個線程首先獲得鎖)
isLockedForThisThread = isLocked || waitingThreads.get(0) != queueObject;
if (!isLockedForThisThread) {
isLocked = true;
waitingThreads.remove(queueObject);
lockingThread = Thread.currentThread();
return;//鏈表中第一個線程加鎖成功后從鏈表中移除自身對應的QueueObject對象,并從這條語句返回
}
}
try{
queueObject.doWait();//其他線程阻塞在這條語句上
}catch (InterruptedException e){
synchronized (this){
waitingThreads.remove(queueObject);
throw e;
}
}
}
}
public synchronized void unlock(){
if(this.lockingThread != Thread.currentThread()){
throw new IllegalMonitorStateException("Calling thread has not locked this lock");
}
isLocked = false;
lockingThread = null;
if(waitingThreads.size() > 0){
waitingThreads.get(0).doNotify();//默認喚醒鏈表中第一個對象對應的線程,達到公平的目的
}
}
}
/**
* @Author: Jeysin
* @Date: 2019/4/16 12:20
* @Desc:
*/
public class QueueObject {
private boolean isNotified = false;
public synchronized void doWait() throws InterruptedException{
while(!isNotified){
this.wait();
}
this.isNotified = false;
}
public synchronized void doNotify(){
this.isNotified = true;
this.notify();
}
@Override
public boolean equals(Object obj) {
return this == obj;
}
}
讀寫鎖的實現
還記得秋招面試美團的時候,二面面試官的第一道編程題就是實現一個讀寫鎖,當時不會Java,用C++寫的,還記得當時用的是Linux下的pthread_mutex(也就是互斥量),耗了半個小時死活沒有實現出一個讀寫鎖,感覺怎么寫都不對,都有點懷疑人生了,毫無疑問那場面試掛掉了。當時我就在想,肯定是一開始思路就錯了,pthread_mutex雖然也可以實現一個鎖的功能,但是離實現讀寫鎖還是差了太遠,一個pthread_mutex肯定是不行的(甚至用兩個也不行,別問我是怎么知道的,我在那半個小時的面試里嘗試了無數次最后還是不行)。直到最近看了Java版本的一個實現,synchronized加上wait和notify完美解決問題,我才意識到果然是一開始思路就錯了,也許當時我用一個pthread_mutex和一個pthread_cond就可以解決問題?,F在想來,要實現一個讀寫鎖最關鍵的地方要有線程的喚醒機制,notify可以做到,pthread_cond也可以做到,但是光用pthread_mutex是不可能做到的。啥也不說了,Java大法好。
/**
* @Author: Jeysin
* @Date: 2019/4/16 22:01
* @Desc: 不可重入的讀寫鎖實現
*/
public class ReadWriteLock {
private volatile int readers = 0;
private volatile int writers = 0;
private volatile int writeRequests = 0;
public synchronized void lockRead() throws InterruptedException{
while(writers > 0 || writeRequests > 0){
this.wait();
}
++readers;
}
public synchronized void unlockRead(){
--readers;
this.notifyAll();
}
public synchronized void lockWrite() throws InterruptedException{
++writeRequests;
while(readers > 0 || writers > 0){
wait();
}
--writeRequests;
++writers;
}
public synchronized void unlockWrite(){
--writers;
notifyAll();
}
}
順帶附上一個可重入版本的讀寫鎖實現:
/**
* @Author: Jeysin
* @Date: 2019/4/16 22:33
* @Desc: 可重入讀寫鎖的實現
*/
public class ReentrantReadWriteLock {
private Map readingThreadsMap = new HashMap();
private volatile int writers = 0;
private volatile int writeRequests = 0;
private volatile Thread writingThread = null;
public synchronized void lockRead() throws InterruptedException{
Thread callingThread = Thread.currentThread();
while(!canGrantReadAccess(callingThread)){
wait();
}
readingThreadsMap.put(callingThread,getAccessCount(callingThread) + 1);
}
public synchronized void unlockRead(){
Thread callingThread = Thread.currentThread();
int count = getAccessCount(callingThread);
if(count == 1){
readingThreadsMap.remove(callingThread);
}else {
readingThreadsMap.put(callingThread, count-1);
}
notifyAll();
}
public synchronized void lockWrite() throws InterruptedException{
++writeRequests;
Thread callingThread = Thread.currentThread();
while(!canGrantWriteAccess(callingThread)){
wait();
}
--writeRequests;
++writers;
writingThread = callingThread;
}
public synchronized void unlockWrite(){
--writers;
if(writers == 0){
writingThread = null;
}
notifyAll();
}
private boolean canGrantWriteAccess(Thread callingThread){
if(readingThreadsMap.size() > 0){
return false;
}
if(writers > 0 && writingThread != callingThread){
return false;
}
return true;
}
private boolean canGrantReadAccess(Thread callingThread){
if(writers > 0){
return false;
}
if(readingThreadsMap.get(callingThread) != null){
return true;
}
if(writeRequests > 0){
return false;
}
return true;
}
private Integer getAccessCount(Thread callingThread){
Integer count = readingThreadsMap.get(callingThread);
if(count == null){
return 0;
}
return count;
}
}
信號量
信號量的實現同樣也可以借用synchronized關鍵字,不得不說,synchronized大法好啊~
/**
* @Author: Jeysin
* @Date: 2019/4/18 15:16
* @Desc: 信號量的實現
*/
public class Semaphore {
private volatile boolean signal = false;
public synchronized void take(){
this.signal = true;
this.notify();
}
public synchronized void release() throws InterruptedException{
while(!this.signal){
wait();
}
this.signal = false;
}
}
/**
* @Author: Jeysin
* @Date: 2019/4/18 15:21
* @Desc: 有上限的信號量的實現
*/
public class BoundedSemaphore {
private volatile int signal = 0;
private volatile int bound = 0;
public BoundedSemaphore(int bound){
this.bound = bound;
}
public synchronized void take() throws InterruptedException{
while(this.signal == this.bound){
wait();
}
++signal;
notify();
}
public synchronized void release() throws InterruptedException{
while(signal == 0){
wait();
}
--signal;
notify();
}
}
阻塞隊列
/**
* @Author: Jeysin
* @Date: 2019/4/18 15:43
* @Desc: 阻塞隊列的實現
*/
public class BlockQueue {
private List queue = new LinkedList();
private volatile int limit = 10;
public BlockQueue(int limit){
this.limit = limit;
}
public synchronized void enqueue(Object object) throws InterruptedException{
while(this.queue.size() > limit){
wait();
}
if(this.queue.size() == 1){
notifyAll();
}
queue.add(object);
}
public synchronized Object dequeue() throws InterruptedException{
while(this.queue.size() == 0){
wait();
}
if(this.queue.size() == limit){
notifyAll();
}
return this.queue.remove(0);
}
}
線程池
有了阻塞隊列,線程池的實現就很簡單了
/**
* @Author: Jeysin
* @Date: 2019/4/18 16:07
* @Desc: 線程池的實現
*/
public class ThreadPool {
private BlockingQueue taskQueue = null;
private List threads = new ArrayList();
private volatile boolean isStopped = false;
public ThreadPool(int threadNums, int maxTaskNums){
this.taskQueue = new LinkedBlockingQueue(maxTaskNums);
for(int i=0; i
threads.add(new PoolThread(taskQueue));
}
for(PoolThread poolThread : threads){
poolThread.start();
}
}
public synchronized void execute(Runnable task){
if(this.isStopped){
throw new IllegalStateException("Thread pool is stopped");
}
this.taskQueue.add(task);
}
public synchronized void stop(){
this.isStopped = true;
for(PoolThread poolThread : threads){
poolThread.toStop();
}
}
}
/**
* @Author: Jeysin
* @Date: 2019/4/18 16:09
* @Desc:
*/
public class PoolThread extends Thread {
private BlockingQueue taskQueue = null;
private volatile boolean isStopped = false;
public PoolThread(BlockingQueue queue){
this.taskQueue = queue;
}
@Override
public void run() {
while(!isStopped){
try{
Runnable runnable = taskQueue.take();
runnable.run();
}catch (Exception e){
e.printStackTrace();
}
}
}
public synchronized void toStop(){
isStopped = true;
this.interrupt();
}
}
總結
以上是生活随笔為你收集整理的java 阻塞锁_Java实现锁、公平锁、读写锁、信号量、阻塞队列、线程池等常用并发工具...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 路由器散热也可以这样用 路由器散热如何处
- 下一篇: java 定义变量时 赋值与不赋值_探究