日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

多线程与高并发(六):线程池可用的各种高并发容器详解:CopyOnWriteList,BlockingQueue等

發(fā)布時間:2024/2/28 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 多线程与高并发(六):线程池可用的各种高并发容器详解:CopyOnWriteList,BlockingQueue等 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

容器

物理結構:數(shù)組、鏈表
邏輯結構:很多

Queue主要是為高并發(fā)準備的。

Vector Hashtable

Vector Hashtable 自帶鎖,有很多設計上不完善的地方,現(xiàn)在基本上不用。
測試Hashtable的性能:用100的線程,先put進去1000000個數(shù),再get 1000000個數(shù)

package com.mashibing.juc.c_023_02_FromHashtableToCHM;import java.util.Hashtable; import java.util.UUID;public class T01_TestHashtable {static Hashtable<UUID, UUID> m = new Hashtable<>();static int count = Constants.COUNT;static UUID[] keys = new UUID[count];static UUID[] values = new UUID[count];static final int THREAD_COUNT = Constants.THREAD_COUNT;static {for (int i = 0; i < count; i++) {keys[i] = UUID.randomUUID();values[i] = UUID.randomUUID();}}static class MyThread extends Thread {int start;int gap = count / THREAD_COUNT;public MyThread(int start) {this.start = start;}@Overridepublic void run() {for (int i = start; i < start + gap; i++) {m.put(keys[i], values[i]);}}}public static void main(String[] args) {long start = System.currentTimeMillis();Thread[] threads = new Thread[THREAD_COUNT];for (int i = 0; i < threads.length; i++) {threads[i] = new MyThread(i * (count / THREAD_COUNT));}for (Thread t : threads) {t.start();}for (Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}long end = System.currentTimeMillis();System.out.println(end - start);System.out.println("size=" + m.size());//---------------上面是測試put,下面是測試get 10000000次的第10的元素--------------------start = System.currentTimeMillis();for (int i = 0; i < threads.length; i++) {threads[i] = new Thread(() -> {for (int j = 0; j < 10000000; j++) {m.get(keys[10]);}});}for (Thread t : threads) {t.start();}for (Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}end = System.currentTimeMillis();System.out.println(end - start);} }

直接使用Hashmap,多線程時會出現(xiàn)問題

package com.mashibing.juc.c_023_02_FromHashtableToCHM;import java.util.HashMap; import java.util.UUID;public class T02_TestHashMap {static HashMap<UUID, UUID> m = new HashMap<>();static int count = Constants.COUNT;static UUID[] keys = new UUID[count];static UUID[] values = new UUID[count];static final int THREAD_COUNT = Constants.THREAD_COUNT;static {for (int i = 0; i < count; i++) {keys[i] = UUID.randomUUID();values[i] = UUID.randomUUID();}}static class MyThread extends Thread {int start;int gap = count/THREAD_COUNT;public MyThread(int start) {this.start = start;}@Overridepublic void run() {for(int i=start; i<start+gap; i++) {m.put(keys[i], values[i]);}}}public static void main(String[] args) {long start = System.currentTimeMillis();Thread[] threads = new Thread[THREAD_COUNT];for(int i=0; i<threads.length; i++) {threads[i] =new MyThread(i * (count/THREAD_COUNT));}for(Thread t : threads) {t.start();}for(Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}long end = System.currentTimeMillis();System.out.println(end - start);System.out.println(m.size());} }

使用SynchronizedHashMap,效率與直接使用Hashtable區(qū)別不是很大

package com.mashibing.juc.c_023_02_FromHashtableToCHM;import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID;public class T03_TestSynchronizedHashMap {static Map<UUID, UUID> m = Collections.synchronizedMap(new HashMap<UUID, UUID>());static int count = Constants.COUNT;static UUID[] keys = new UUID[count];static UUID[] values = new UUID[count];static final int THREAD_COUNT = Constants.THREAD_COUNT;static {for (int i = 0; i < count; i++) {keys[i] = UUID.randomUUID();values[i] = UUID.randomUUID();}}static class MyThread extends Thread {int start;int gap = count/THREAD_COUNT;public MyThread(int start) {this.start = start;}@Overridepublic void run() {for(int i=start; i<start+gap; i++) {m.put(keys[i], values[i]);}}}public static void main(String[] args) {long start = System.currentTimeMillis();Thread[] threads = new Thread[THREAD_COUNT];for(int i=0; i<threads.length; i++) {threads[i] =new MyThread(i * (count/THREAD_COUNT));}for(Thread t : threads) {t.start();}for(Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}long end = System.currentTimeMillis();System.out.println(end - start);System.out.println(m.size());//-----------------------------------start = System.currentTimeMillis();for (int i = 0; i < threads.length; i++) {threads[i] = new Thread(()->{for (int j = 0; j < 10000000; j++) {m.get(keys[10]);}});}for(Thread t : threads) {t.start();}for(Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}end = System.currentTimeMillis();System.out.println(end - start);} }

使用ConcurrentHashMap,插入的效率與前面的Hashtable差不多,但是讀取的效率非常高。

package com.mashibing.juc.c_023_02_FromHashtableToCHM;import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap;public class T04_TestConcurrentHashMap {static Map<UUID, UUID> m = new ConcurrentHashMap<>();static int count = Constants.COUNT;static UUID[] keys = new UUID[count];static UUID[] values = new UUID[count];static final int THREAD_COUNT = Constants.THREAD_COUNT;static {for (int i = 0; i < count; i++) {keys[i] = UUID.randomUUID();values[i] = UUID.randomUUID();}}static class MyThread extends Thread {int start;int gap = count/THREAD_COUNT;public MyThread(int start) {this.start = start;}@Overridepublic void run() {for(int i=start; i<start+gap; i++) {m.put(keys[i], values[i]);}}}public static void main(String[] args) {long start = System.currentTimeMillis();Thread[] threads = new Thread[THREAD_COUNT];for(int i=0; i<threads.length; i++) {threads[i] =new MyThread(i * (count/THREAD_COUNT));}for(Thread t : threads) {t.start();}for(Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}long end = System.currentTimeMillis();System.out.println(end - start);System.out.println(m.size());//-----------------------------------start = System.currentTimeMillis();for (int i = 0; i < threads.length; i++) {threads[i] = new Thread(()->{for (int j = 0; j < 10000000; j++) {m.get(keys[10]);}});}for(Thread t : threads) {t.start();}for(Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}end = System.currentTimeMillis();System.out.println(end - start);} }

最早的容器Vector是自帶鎖的,但是你整個操作調用了兩個原子方法的話,整體并不是原子的。你還需要在外面加sync

/*** 有N張火車票,每張票都有一個編號* 同時有10個窗口對外售票* 請寫一個模擬程序* <p>* 分析下面的程序可能會產生哪些問題?* 重復銷售?超量銷售?* <p>* 使用Vector或者Collections.synchronizedXXX* 分析一下,這樣能解決問題嗎?* <p>* 就算操作A和B都是同步的,但A和B組成的復合操作也未必是同步的,仍然需要自己進行同步* 就像這個程序,判斷size和進行remove必須是一整個的原子操作** @author 馬士兵*/ package com.mashibing.juc.c_024_FromVectorToQueue;import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit;public class TicketSeller3 {static List<String> tickets = new LinkedList<>();static {for (int i = 0; i < 1000; i++) tickets.add("票 編號:" + i);}public static void main(String[] args) {for (int i = 0; i < 10; i++) {new Thread(() -> {while (true) {synchronized (tickets) {if (tickets.size() <= 0) break;try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("銷售了--" + tickets.remove(0));}}}).start();}} }

使用Queue:ConcurrentLinkedQueue,里面很多方法是CAS實現(xiàn)的

/*** 有N張火車票,每張票都有一個編號* 同時有10個窗口對外售票* 請寫一個模擬程序* 分析下面的程序可能會產生哪些問題?* 重復銷售?超量銷售?* 使用Vector或者Collections.synchronizedXXX* 分析一下,這樣能解決問題嗎?* 就算操作A和B都是同步的,但A和B組成的復合操作也未必是同步的,仍然需要自己進行同步* 就像這個程序,判斷size和進行remove必須是一整個的原子操作* 使用ConcurrentQueue提高并發(fā)性*/ package com.mashibing.juc.c_024_FromVectorToQueue;import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue;public class TicketSeller4 {static Queue<String> tickets = new ConcurrentLinkedQueue<>();static {for (int i = 0; i < 1000; i++) tickets.add("票 編號:" + i);}public static void main(String[] args) {for (int i = 0; i < 10; i++) {new Thread(() -> {while (true) {String s = tickets.poll();if (s == null) break;else System.out.println("銷售了--" + s);}}).start();}} }

多線程常用的容器

ConcurrentHashMap

里面用的是CAS操作,而CAS在Tree操作的時候太復雜了,所以不存在ConcurrentTreeMap,為了排序,換了跳表的結構代替Tree結構

跳表

  • 底層是鏈表
  • 拿出關鍵元素新開一層
  • 在查找的時候,從上往下查
  • CAS的實現(xiàn)難度比TreeMap容易很多
  • 查找操作的時間復雜度比鏈表快很多
/*** http://blog.csdn.net/sunxianghuang/article/details/52221913 * http://www.educity.cn/java/498061.html* 閱讀concurrentskiplistmap*/ package com.mashibing.juc.c_025;import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch;public class T01_ConcurrentMap {public static void main(String[] args) {Map<String, String> map = new ConcurrentHashMap<>();//Map<String, String> map = new ConcurrentSkipListMap<>(); //高并發(fā)并且排序//Map<String, String> map = new Hashtable<>();//Map<String, String> map = new HashMap<>(); //Collections.synchronizedXXX//TreeMapRandom r = new Random();Thread[] ths = new Thread[100];CountDownLatch latch = new CountDownLatch(ths.length);long start = System.currentTimeMillis();for(int i=0; i<ths.length; i++) {ths[i] = new Thread(()->{for(int j=0; j<10000; j++) map.put("a" + r.nextInt(100000), "a" + r.nextInt(100000));latch.countDown();});}Arrays.asList(ths).forEach(t->t.start());try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}long end = System.currentTimeMillis();System.out.println(end - start);System.out.println(map.size());} }

CopyOnWriteArrayList

本質上和ReadWrite是一個思路

寫時復制,適用于讀線程多,寫線程少的情況。(讀的時候不加鎖),寫的時候copy一個新的,寫完之后把舊的指針指向新的。
寫的效率比較低,因為是數(shù)組,每次寫的時候都要復制。
add操作的源碼如下:
示例

/*** 寫時復制容器 copy on write* 多線程環(huán)境下,寫時效率低,讀時效率高* 適合寫少讀多的環(huán)境*/ package com.mashibing.juc.c_025;import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.Vector; import java.util.concurrent.CopyOnWriteArrayList;public class T02_CopyOnWriteList {public static void main(String[] args) {List<String> lists =//new ArrayList<>(); //這個會出并發(fā)問題!//new Vector();new CopyOnWriteArrayList<>();Random r = new Random();Thread[] ths = new Thread[100];for (int i = 0; i < ths.length; i++) {Runnable task = new Runnable() {@Overridepublic void run() {for (int i = 0; i < 1000; i++) lists.add("a" + r.nextInt(10000));}};ths[i] = new Thread(task);}runAndComputeTime(ths);System.out.println(lists.size());}static void runAndComputeTime(Thread[] ths) {long s1 = System.currentTimeMillis();Arrays.asList(ths).forEach(t -> t.start());Arrays.asList(ths).forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});long s2 = System.currentTimeMillis();System.out.println(s2 - s1);} }

代碼T05-T09:BlockingQueue講解

Queue和List的區(qū)別是什么?

  • 提供了很多在多線程訪問下比較友好的API
  • offer,peek,pool

BlockingQueue的特點是什么?
BlockingQueue的優(yōu)勢在于,增加了更多API,比如put,take
或者阻塞,或者指定時間等待
實現(xiàn)生產者-消費者模型,也是多線程里面最重要的一個模型,也是MQ的基礎——MQ的本質,就是一個大型的生產者、消費者模型

LinkedBlockingQueue

LinkedBlockingQueue,是用鏈表實現(xiàn)的BlockingQueue
阻塞使用await實現(xiàn)的,底層應該是park

Queue常用接口
LinkedBlockingQueue示例

package com.mashibing.juc.c_025;import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit;public class T05_LinkedBlockingQueue {static BlockingQueue<String> strs = new LinkedBlockingQueue<>();static Random r = new Random();public static void main(String[] args) {new Thread(() -> {for (int i = 0; i < 100; i++) {try {strs.put("a" + i); //如果滿了,就會等待TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}, "p1").start();for (int i = 0; i < 5; i++) {new Thread(() -> {for (; ; ) {try {System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //如果空了,就會等待} catch (InterruptedException e) {e.printStackTrace();}}}, "c" + i).start();}} }

DelayQueue

是BlockingQueue的一種,是一種阻塞的隊列。
需要實現(xiàn)compareTo方法
需要指定等待時間
用來按時間進行任務調度

package com.mashibing.juc.c_025;import java.util.Calendar; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit;public class T07_DelayQueue {static BlockingQueue<MyTask> tasks = new DelayQueue<>();static Random r = new Random();static class MyTask implements Delayed {String name;long runningTime;MyTask(String name, long rt) {this.name = name;this.runningTime = rt;}@Overridepublic int compareTo(Delayed o) {if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))return -1;else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))return 1;elsereturn 0;}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic String toString() {return name + " " + runningTime;}}public static void main(String[] args) throws InterruptedException {long now = System.currentTimeMillis();MyTask t1 = new MyTask("t1", now + 1000);MyTask t2 = new MyTask("t2", now + 2000);MyTask t3 = new MyTask("t3", now + 1500);MyTask t4 = new MyTask("t4", now + 2500);MyTask t5 = new MyTask("t5", now + 500);tasks.put(t1);tasks.put(t2);tasks.put(t3);tasks.put(t4);tasks.put(t5);System.out.println(tasks);for (int i = 0; i < 5; i++) {System.out.println(tasks.take());}} }

PriorityQueue

內部進行了排序,底層是一個二叉樹(小頂堆)的結構

package com.mashibing.juc.c_025;import java.util.PriorityQueue;public class T07_01_PriorityQueque {public static void main(String[] args) {PriorityQueue<String> q = new PriorityQueue<>();q.add("c");q.add("e");q.add("a");q.add("d");q.add("z");for (int i = 0; i < 5; i++) {System.out.println(q.poll());}} } 輸出: a c d e z

SynchronousQueue

容量為0,不能往里裝東西,只有有一個線程等著的時候,才能把東西遞到這個線程手里,是用來一個線程給另外一個線程傳數(shù)據(jù)的。
本質和Exchanger比較相似,也是需要兩個線程同步對接,否則都會阻塞著。

在線程池里面,線程之間進行任務調度的時候,經常會用到。

package com.mashibing.juc.c_025;import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue;public class T08_SynchronusQueue { //容量為0public static void main(String[] args) throws InterruptedException {BlockingQueue<String> strs = new SynchronousQueue<>();new Thread(() -> {try {System.out.println(strs.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();strs.put("aaa"); //阻塞等待消費者消費//strs.put("bbb");//strs.add("aaa");System.out.println(strs.size());} }

TransferQueue

裝完,阻塞等著,有線程把它取走,再離開
要先開啟消費者線程,再往里面transfer,要不然就阻塞了~

場景1:要求某件任務有一個結果(比如一個訂單等付款完成之后,確認有線程去處理它了,再給客戶反饋)
場景2:確認收錢完成之后,才能把商品取走,比如面對面付款

示例:

package com.mashibing.juc.c_025;import java.util.concurrent.LinkedTransferQueue;public class T09_TransferQueue {public static void main(String[] args) throws InterruptedException {LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();new Thread(() -> {try {System.out.println(strs.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();strs.transfer("aaa");//strs.put("aaa");/*new Thread(() -> {try {System.out.println(strs.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();*/} }

經典的交替打印面試題可以用 TransferQueue 實現(xiàn)

package com.mashibing.juc.c_026_00_interview.A1B2C3;import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.TransferQueue;public class T13_TransferQueue {public static void main(String[] args) {char[] aI = "1234567".toCharArray();char[] aC = "ABCDEFG".toCharArray();TransferQueue<Character> queue = new LinkedTransferQueue<Character>();new Thread(() -> {try {for (char c : aI) {System.out.print(queue.take());queue.transfer(c);}} catch (InterruptedException e) {e.printStackTrace();}}, "t1").start();new Thread(() -> {try {for (char c : aC) {queue.transfer(c);System.out.print(queue.take());}} catch (InterruptedException e) {e.printStackTrace();}}, "t2").start();} }

下節(jié)課預習

  • Callable
  • Future, Completable Future

總結

以上是生活随笔為你收集整理的多线程与高并发(六):线程池可用的各种高并发容器详解:CopyOnWriteList,BlockingQueue等的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內容還不錯,歡迎將生活随笔推薦給好友。