多线程与高并发(六):线程池可用的各种高并发容器详解:CopyOnWriteList,BlockingQueue等
容器
物理結構:數(shù)組、鏈表
邏輯結構:很多
Queue主要是為高并發(fā)準備的。
Vector Hashtable
Vector Hashtable 自帶鎖,有很多設計上不完善的地方,現(xiàn)在基本上不用。
測試Hashtable的性能:用100的線程,先put進去1000000個數(shù),再get 1000000個數(shù)
直接使用Hashmap,多線程時會出現(xiàn)問題
package com.mashibing.juc.c_023_02_FromHashtableToCHM;import java.util.HashMap; import java.util.UUID;public class T02_TestHashMap {static HashMap<UUID, UUID> m = new HashMap<>();static int count = Constants.COUNT;static UUID[] keys = new UUID[count];static UUID[] values = new UUID[count];static final int THREAD_COUNT = Constants.THREAD_COUNT;static {for (int i = 0; i < count; i++) {keys[i] = UUID.randomUUID();values[i] = UUID.randomUUID();}}static class MyThread extends Thread {int start;int gap = count/THREAD_COUNT;public MyThread(int start) {this.start = start;}@Overridepublic void run() {for(int i=start; i<start+gap; i++) {m.put(keys[i], values[i]);}}}public static void main(String[] args) {long start = System.currentTimeMillis();Thread[] threads = new Thread[THREAD_COUNT];for(int i=0; i<threads.length; i++) {threads[i] =new MyThread(i * (count/THREAD_COUNT));}for(Thread t : threads) {t.start();}for(Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}long end = System.currentTimeMillis();System.out.println(end - start);System.out.println(m.size());} }使用SynchronizedHashMap,效率與直接使用Hashtable區(qū)別不是很大
package com.mashibing.juc.c_023_02_FromHashtableToCHM;import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID;public class T03_TestSynchronizedHashMap {static Map<UUID, UUID> m = Collections.synchronizedMap(new HashMap<UUID, UUID>());static int count = Constants.COUNT;static UUID[] keys = new UUID[count];static UUID[] values = new UUID[count];static final int THREAD_COUNT = Constants.THREAD_COUNT;static {for (int i = 0; i < count; i++) {keys[i] = UUID.randomUUID();values[i] = UUID.randomUUID();}}static class MyThread extends Thread {int start;int gap = count/THREAD_COUNT;public MyThread(int start) {this.start = start;}@Overridepublic void run() {for(int i=start; i<start+gap; i++) {m.put(keys[i], values[i]);}}}public static void main(String[] args) {long start = System.currentTimeMillis();Thread[] threads = new Thread[THREAD_COUNT];for(int i=0; i<threads.length; i++) {threads[i] =new MyThread(i * (count/THREAD_COUNT));}for(Thread t : threads) {t.start();}for(Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}long end = System.currentTimeMillis();System.out.println(end - start);System.out.println(m.size());//-----------------------------------start = System.currentTimeMillis();for (int i = 0; i < threads.length; i++) {threads[i] = new Thread(()->{for (int j = 0; j < 10000000; j++) {m.get(keys[10]);}});}for(Thread t : threads) {t.start();}for(Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}end = System.currentTimeMillis();System.out.println(end - start);} }使用ConcurrentHashMap,插入的效率與前面的Hashtable差不多,但是讀取的效率非常高。
package com.mashibing.juc.c_023_02_FromHashtableToCHM;import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap;public class T04_TestConcurrentHashMap {static Map<UUID, UUID> m = new ConcurrentHashMap<>();static int count = Constants.COUNT;static UUID[] keys = new UUID[count];static UUID[] values = new UUID[count];static final int THREAD_COUNT = Constants.THREAD_COUNT;static {for (int i = 0; i < count; i++) {keys[i] = UUID.randomUUID();values[i] = UUID.randomUUID();}}static class MyThread extends Thread {int start;int gap = count/THREAD_COUNT;public MyThread(int start) {this.start = start;}@Overridepublic void run() {for(int i=start; i<start+gap; i++) {m.put(keys[i], values[i]);}}}public static void main(String[] args) {long start = System.currentTimeMillis();Thread[] threads = new Thread[THREAD_COUNT];for(int i=0; i<threads.length; i++) {threads[i] =new MyThread(i * (count/THREAD_COUNT));}for(Thread t : threads) {t.start();}for(Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}long end = System.currentTimeMillis();System.out.println(end - start);System.out.println(m.size());//-----------------------------------start = System.currentTimeMillis();for (int i = 0; i < threads.length; i++) {threads[i] = new Thread(()->{for (int j = 0; j < 10000000; j++) {m.get(keys[10]);}});}for(Thread t : threads) {t.start();}for(Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}end = System.currentTimeMillis();System.out.println(end - start);} }最早的容器Vector是自帶鎖的,但是你整個操作調用了兩個原子方法的話,整體并不是原子的。你還需要在外面加sync
/*** 有N張火車票,每張票都有一個編號* 同時有10個窗口對外售票* 請寫一個模擬程序* <p>* 分析下面的程序可能會產生哪些問題?* 重復銷售?超量銷售?* <p>* 使用Vector或者Collections.synchronizedXXX* 分析一下,這樣能解決問題嗎?* <p>* 就算操作A和B都是同步的,但A和B組成的復合操作也未必是同步的,仍然需要自己進行同步* 就像這個程序,判斷size和進行remove必須是一整個的原子操作** @author 馬士兵*/ package com.mashibing.juc.c_024_FromVectorToQueue;import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit;public class TicketSeller3 {static List<String> tickets = new LinkedList<>();static {for (int i = 0; i < 1000; i++) tickets.add("票 編號:" + i);}public static void main(String[] args) {for (int i = 0; i < 10; i++) {new Thread(() -> {while (true) {synchronized (tickets) {if (tickets.size() <= 0) break;try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("銷售了--" + tickets.remove(0));}}}).start();}} }使用Queue:ConcurrentLinkedQueue,里面很多方法是CAS實現(xiàn)的
/*** 有N張火車票,每張票都有一個編號* 同時有10個窗口對外售票* 請寫一個模擬程序* 分析下面的程序可能會產生哪些問題?* 重復銷售?超量銷售?* 使用Vector或者Collections.synchronizedXXX* 分析一下,這樣能解決問題嗎?* 就算操作A和B都是同步的,但A和B組成的復合操作也未必是同步的,仍然需要自己進行同步* 就像這個程序,判斷size和進行remove必須是一整個的原子操作* 使用ConcurrentQueue提高并發(fā)性*/ package com.mashibing.juc.c_024_FromVectorToQueue;import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue;public class TicketSeller4 {static Queue<String> tickets = new ConcurrentLinkedQueue<>();static {for (int i = 0; i < 1000; i++) tickets.add("票 編號:" + i);}public static void main(String[] args) {for (int i = 0; i < 10; i++) {new Thread(() -> {while (true) {String s = tickets.poll();if (s == null) break;else System.out.println("銷售了--" + s);}}).start();}} }多線程常用的容器
ConcurrentHashMap
里面用的是CAS操作,而CAS在Tree操作的時候太復雜了,所以不存在ConcurrentTreeMap,為了排序,換了跳表的結構代替Tree結構
跳表
- 底層是鏈表
- 拿出關鍵元素新開一層
- 在查找的時候,從上往下查
- CAS的實現(xiàn)難度比TreeMap容易很多
- 查找操作的時間復雜度比鏈表快很多
CopyOnWriteArrayList
本質上和ReadWrite是一個思路
寫時復制,適用于讀線程多,寫線程少的情況。(讀的時候不加鎖),寫的時候copy一個新的,寫完之后把舊的指針指向新的。
寫的效率比較低,因為是數(shù)組,每次寫的時候都要復制。
add操作的源碼如下:
示例
代碼T05-T09:BlockingQueue講解
Queue和List的區(qū)別是什么?
- 提供了很多在多線程訪問下比較友好的API
- offer,peek,pool
BlockingQueue的特點是什么?
BlockingQueue的優(yōu)勢在于,增加了更多API,比如put,take
或者阻塞,或者指定時間等待
實現(xiàn)生產者-消費者模型,也是多線程里面最重要的一個模型,也是MQ的基礎——MQ的本質,就是一個大型的生產者、消費者模型
LinkedBlockingQueue
LinkedBlockingQueue,是用鏈表實現(xiàn)的BlockingQueue
阻塞使用await實現(xiàn)的,底層應該是park
Queue常用接口
LinkedBlockingQueue示例
DelayQueue
是BlockingQueue的一種,是一種阻塞的隊列。
需要實現(xiàn)compareTo方法
需要指定等待時間
用來按時間進行任務調度
PriorityQueue
內部進行了排序,底層是一個二叉樹(小頂堆)的結構
package com.mashibing.juc.c_025;import java.util.PriorityQueue;public class T07_01_PriorityQueque {public static void main(String[] args) {PriorityQueue<String> q = new PriorityQueue<>();q.add("c");q.add("e");q.add("a");q.add("d");q.add("z");for (int i = 0; i < 5; i++) {System.out.println(q.poll());}} } 輸出: a c d e zSynchronousQueue
容量為0,不能往里裝東西,只有有一個線程等著的時候,才能把東西遞到這個線程手里,是用來一個線程給另外一個線程傳數(shù)據(jù)的。
本質和Exchanger比較相似,也是需要兩個線程同步對接,否則都會阻塞著。
在線程池里面,線程之間進行任務調度的時候,經常會用到。
package com.mashibing.juc.c_025;import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue;public class T08_SynchronusQueue { //容量為0public static void main(String[] args) throws InterruptedException {BlockingQueue<String> strs = new SynchronousQueue<>();new Thread(() -> {try {System.out.println(strs.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();strs.put("aaa"); //阻塞等待消費者消費//strs.put("bbb");//strs.add("aaa");System.out.println(strs.size());} }TransferQueue
裝完,阻塞等著,有線程把它取走,再離開
要先開啟消費者線程,再往里面transfer,要不然就阻塞了~
場景1:要求某件任務有一個結果(比如一個訂單等付款完成之后,確認有線程去處理它了,再給客戶反饋)
場景2:確認收錢完成之后,才能把商品取走,比如面對面付款
示例:
package com.mashibing.juc.c_025;import java.util.concurrent.LinkedTransferQueue;public class T09_TransferQueue {public static void main(String[] args) throws InterruptedException {LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();new Thread(() -> {try {System.out.println(strs.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();strs.transfer("aaa");//strs.put("aaa");/*new Thread(() -> {try {System.out.println(strs.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();*/} }經典的交替打印面試題可以用 TransferQueue 實現(xiàn)
package com.mashibing.juc.c_026_00_interview.A1B2C3;import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.TransferQueue;public class T13_TransferQueue {public static void main(String[] args) {char[] aI = "1234567".toCharArray();char[] aC = "ABCDEFG".toCharArray();TransferQueue<Character> queue = new LinkedTransferQueue<Character>();new Thread(() -> {try {for (char c : aI) {System.out.print(queue.take());queue.transfer(c);}} catch (InterruptedException e) {e.printStackTrace();}}, "t1").start();new Thread(() -> {try {for (char c : aC) {queue.transfer(c);System.out.print(queue.take());}} catch (InterruptedException e) {e.printStackTrace();}}, "t2").start();} }下節(jié)課預習
- Callable
- Future, Completable Future
總結
以上是生活随笔為你收集整理的多线程与高并发(六):线程池可用的各种高并发容器详解:CopyOnWriteList,BlockingQueue等的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JVM从入门到精通(五): Java运行
- 下一篇: JVM从入门到精通(六):JVM调优必备