高并发之并发容器详解(从入门到超神)
一、ConcurrentHashMap
在上面已經提到過ConcurrentHashMap,ConcurrentHashMap相比Hashtable能夠進一步提高并發性,其原理圖如下:
HashMap,Hashtable與ConcurrentHashMap都是實現的哈希表數據結構,在隨機讀取的時候效率很高。Hashtable實現同步是利用synchronized關鍵字進行鎖定的,其是針對整張哈希表進行鎖定的,即每次鎖住整張表讓線程獨占,在線程安全的背后是巨大的浪費。ConcurrentHashMap和Hashtable主要區別就是圍繞著鎖的粒度進行區別以及如何區鎖定。
上圖中,左邊是Hashtable的實現方式,可以看到鎖住整個哈希表;而右邊則是ConcurrentHashMap的實現方式,單獨鎖住每一個桶(segment).ConcurrentHashMap將哈希表分為16個桶(默認值),諸如get(),put(),remove()等常用操作只鎖當前需要用到的桶,而size()才鎖定整張表。原來只能一個線程進入,現在卻能同時接受16個寫線程并發進入(寫線程需要鎖定,而讀線程幾乎不受限制),并發性的提升是顯而易見的。
而在迭代時,ConcurrentHashMap使用了不同于傳統集合的快速失敗迭代器(fast-fail iterator)的另一種迭代方式,稱為弱一致迭代器。在這種迭代方式中,當iterator被創建后集合再發生改變就不再是拋出ConcurrentModificationException,取而代之的是在改變時實例化出新的數據從而不影響原有的數據,iterator完成后再將頭指針替換為新的數據,這樣iterator線程可以使用原來老的數據,而寫線程也可以并發的完成改變,更重要的,這保證了多個線程并發執行的連續性和擴展性,是性能提升的關鍵。
我們在上面闡述了ConcurrentHashMap的使用特點和原理,分別在同樣的一個高并發場景下,測試不同的方式產生的延時(ms):
Map<String, String> map = new ConcurrentHashMap<>();//483 Map<String, String> map = new ConcurrentSkipListMap<>(); //高并發并且排序 559 Map<String, String> map = new Hashtable<>(); //499 Map<String, String> map =Collections.synchronizedMap(new HashMap<>()); // 530 Map<String, String> map =Collections.synchronizedMap(new TreeMap()); //905以ConcurrentLinkedQueue為例,他實現了Queue接口,實例化方式如下:
Queue<String> strs = new ConcurrentLinkedQueue<>();添加元素的方法:offer()
取出隊頭的方法:poll()
判斷隊列長度:size()
對于雙端隊列,使用ConcurrentLinkedDeque類型來實現.
下面我們再看一個具體的實例:
public class T01_ConcurrentMap {public static void main(String[] args) {Map<String, String> map = new ConcurrentHashMap<String, String>();//Map<String, String> map = new ConcurrentSkipListMap<String, String>(); //高并發并且排序//Map<String, String> map = new Hashtable<>();//Map<String, String> map = new HashMap<String, String>();Random random = new Random();Thread[] threads = new Thread[100];CountDownLatch latch = new CountDownLatch(threads.length);long start = System.currentTimeMillis();for (int i = 0; i < threads.length; i++) {threads[i] = new Thread(()->{for(int j=0; j<10000;j++) map.put("a" + random.nextInt(100000), "a" + random.nextInt(100000));latch.countDown();});}Arrays.asList(threads).forEach(t->t.start());try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}long end = System.currentTimeMillis();System.out.println(end-start);} }啟動100個線程,向圖中添加100000個元素,分別使用Hashtable,HashMap,ConcurrentHashMap,ConcurrentSkipListMap定義map,判斷程序完成的時間。最終發現,ConcurrentHashMap要比HashMap效率高,ConcurrentHashMap是將大鎖分成若干小鎖,實現多個線程共同運行,所以,效率有很大差距。ConcurrentSkipListMap較ConcurrentHashMap除了實現高并發外還能夠排序。
參考:
http://blog.csdn.net/sunxianghuang/article/details/52221913 http://www.educity.cn/java/498061.html###二、ConcurrentQueue
與ConcurrentHashMap相同,ConcurrentQueue也是通過同樣的方式來提高并發性能的。
我們在同步容器中提到過火車票問題:
有N張火車票,每張票都有一個編號,同時有10個窗口對外售票,寫一個模擬程序。
在上述問題中,也可以使用ConcurrentQueue進一步提高并發性:
static Queue<String> tickets = new ConcurrentLinkedQueue<>();具體的代碼是這樣的:
public class TicketSeller4 {static Queue<String> tickets = new ConcurrentLinkedQueue<>();static {for(int i=0; i<1000; i++) tickets.add("票編號:" + i);}public static void main(String[] args) {for(int i=0; i<10; i++) {new Thread(()->{while(true) {String s = tickets.poll();if(s == null) break;else System.out.println("銷售了--" + s);}}).start();}} }這里面通過ConcurrentLinkedQueue的poll()方法來實現獲取容器成員的。用這個類型可以進一步提高并發性。
具體基本操作實例
public class T04_ConcurrentQueue {public static void main(String[] args) {Queue<String> strings = new ConcurrentLinkedQueue<String>();for (int i = 0; i < 10; i++) {strings.offer("a" + i); //相當于add, 放進隊列}System.out.println(strings);System.out.println(strings.size());System.out.println(strings.poll()); //取出并移除掉System.out.println(strings.size());System.out.println(strings.peek()); //取出,不會移除。相當于get(0)System.out.println(strings.size());} }三、CopyOnWriteArrayList
寫時復制容器,即copy-on-write,在多線程環境下,寫時效率低,讀時效率高,適合寫少讀多的環境。對比測試幾種情況:
List<String> lists = new ArrayList<>(); //這個會出并發問題!報錯:ArrayIndexOutOfBoundsException List<String> lists = new Vector();//111 ms List<String> lists = new CopyOnWriteArrayList<>();//5230 ms //測試核心代碼: Runnable task = new Runnable() { @Override public void run() {for(int i=0; i<1000; i++) lists.add("a" + r.nextInt(10000)); } }; //多線程向該容器中不斷加入數據。從JDK 5開始Java并發包里提供了兩個使用CopyOnWrite機制實現的并發容器,它們是CopyOnWriteArrayList和CopyOnWriteArraySet。
當我們往一個容器添加元素的時候,不直接往當前容器添加,而是先將當前容器進行Copy,復制出一個新的容器,然后向新的容器里添加元素,添加完元素之后,再將原容器的引用指向新的容器。這樣做的好處是我們可以對CopyOnWrite容器進行并發的讀,而不需要加鎖,因為在當前讀的容器中不會添加任何元素。所以CopyOnWrite容器是一種讀寫分離的思想,讀和寫對應不同的容器。
四、BlockingQueue
這種并發容器,會自動實現阻塞式的生產者/消費者模式。使用隊列解耦合,在實現異步事物的時候很有用。下面的例子,實現了阻塞隊列:
LinkedBlockingQueue static BlockingQueue<String> strs = new LinkedBlockingQueue<>(10); strs.put("a" + i); //加入隊列,如果滿了,就會等待 strs.take(); //取出隊列元素,如果空了,就會等待在實例化時,可以指定具體的隊列容量。
在加入成員的時候,除了使用put方法還可以使用其他方法:
五、ArrayBlockingQueue
static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10);對象的方法和上面的BlockingQueue是一樣的,用法也是一樣的。
二者的區別主要是:
相比于數組實現的ArrayBlockingQueue的有界情況,我們稱之為有界隊列,LinkedBlockingQueue可認為是無界隊列。當然,也可以向上面那樣指定隊列容量,但是這個參數常常是省略的,多用于任務隊列。
六、LinkedBlockingQueue實例
public class T05_LinkedBlockingQueue {private static BlockingQueue<String> strings = new LinkedBlockingQueue<String>();private static Random r = new Random();public static void main(String[] args) {new Thread(()->{for (int i = 0; i < 100; i++) {try {strings.put("a" + i); //如果滿了,就會等待TimeUnit.SECONDS.sleep(r.nextInt(10));} catch (Exception e) {e.printStackTrace();}}}, "p1").start();for (int i = 0; i < 5; i++) {new Thread(()->{for(;;){try {System.out.println(Thread.currentThread().getName() + "take -" + strings.take()); //如果空了,就會等待} catch (Exception e) {e.printStackTrace();} }},"c" + i).start();}} }LinkedBlockingQueue是使用鏈表是實現的阻塞式容器。
七、DelayQueue
DelayQueue也是一個BlockingQueue,其特化的參數是Delayed。
Delayed擴展了Comparable接口,比較的基準為延時的時間值,Delayed接口的實現類getDelay()的返回值應為固定值(final).DelayQueue內部是使用PriorityQueue實現的,即:
可以說,DelayQueue是一個使用優先隊列(PriorityQueue)實現的BlockingQueue,優先隊列的比較基準值是時間。這是一個無界的BlockingQueue,用于放置實現了Delayed接口的對象,其中的對象只能在其到期時才能從隊列中取走。這種隊列是有序的,即隊頭對象的延遲到期時間最長。但是要注意的是,不能將null元素放置到這種隊列中。
Delayed,一種混合風格的接口,用來標記那些應該在給定延遲時間之后執行的對象。此接口的實現類必須重寫一個 compareTo() 方法,該方法提供與此接口的 getDelay()方法一致的排序。
DelayQueue存儲的對象是實現了Delayed接口的對象,在這個對象中,需要重寫compareTo()和getDelay()方法,例如:
因此,當我們在main()函數中,向該隊列加入元素后再取出元素的過程,就會存在延時,可以這樣驗證:
long now = System.currentTimeMillis(); MyTask t1 = new MyTask(now + 1000); MyTask t2 = new MyTask(now + 2000); MyTask t3 = new MyTask(now + 1500); MyTask t4 = new MyTask(now + 2500); MyTask t5 = new MyTask(now + 500); tasks.put(t1); tasks.put(t2); tasks.put(t3); tasks.put(t4); tasks.put(t5); System.out.println(tasks); for(int i=0; i<5; i++) { System.out.println(tasks.take()); }注意:為了方便查看到效果,可以重寫toString()函數,來保證打印出來的結果有意義:
例如:
DelayQueue可以用在諸如用監控線程來輪詢是否有超時任務出現,來處理某些具有等待時延的情況,這樣,可以避免由于數量巨大造成的輪詢效率差的問題。例如:
實例:
public class T07_DelayQueue {private static BlockingQueue<MyTask> tasks = new DelayQueue<>();private static Random r = new Random();static class MyTask implements Delayed{long runningTime;public MyTask(long rt) {this.runningTime = rt;}@Overridepublic int compareTo(Delayed o) {if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MICROSECONDS)) {return -1;}else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {return 1;}else {return 0;}}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic String toString() {return "" + runningTime;}public static void main(String[] args) throws InterruptedException {long now = System.currentTimeMillis();MyTask t1 = new MyTask(now + 1000);MyTask t2 = new MyTask(now + 2000);MyTask t3 = new MyTask(now + 1500);MyTask t4 = new MyTask(now + 2500);MyTask t5 = new MyTask(now + 500);tasks.put(t1);tasks.put(t2);tasks.put(t3);tasks.put(t4);tasks.put(t5);System.out.println(tasks);for (int i = 0; i < 5; i++) {System.out.println(tasks.take());}}} }八、LinkedTransferQueue
TransferQueue是一個繼承了BlockingQueue的接口,并且增加若干新的方法。LinkedTransferQueue是TransferQueue接口的實現類,其定義為一個無界的隊列,具有先進先出(FIFO)的特性。
TransferQueue接口含有下面幾個重要方法:
transfer(E e)
若當前存在一個正在等待獲取的消費者線程,即立刻移交之;否則,會插入當前元素e到隊列尾部,并且等待進入阻塞狀態,到有消費者線程取走該元素。
tryTransfer(E e)
若當前存在一個正在等待獲取的消費者線程(使用take()或者poll()函數),使用該方法會即刻轉移/傳輸對象元素e;若不存在,則返回false,并且不進入隊列。這是一個不阻塞的操作。
tryTransfer(E e,long timeout,TimeUnit unit)
若當前存在一個正在等待獲取的消費者線程,會立即傳輸給它;否則將插入元素e到隊列尾部,并且等待被消費者線程獲取消費掉;若在指定的時間內元素e無法被消費者線程獲取,則返回false,同時該元素被移除。
hasWaitingConsumer()
判斷是否存在消費者線程。
getWaitingConsumerCount()
獲取所有等待獲取元素的消費線程數量。
size()
因為隊列的異步特性,檢測當前隊列的元素個數需要逐一迭代,無法保證原子性,可能會得到一個不太準確的結果,尤其是在遍歷時有可能隊列發生更改。
使用方法:
LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();//實例化如果當前沒有消費者線程(存在take方法的線程):
strs.transfer("aaa");該方法會一直阻塞在這里,知道有消費者線程存在。
而如果使用傳統的put()方法來加入元素的話,則不會發生阻塞現象。
同樣,獲取隊列中元素的方法take()也是阻塞在這里等待獲取新的元素的。
九、SynchronousQueue
SynchronousQueue也是一種BlockingQueue,是一種無緩沖的等待隊列。所以,在某次添加元素后必須等待其他線程取走后才能繼續添加;可以認為SynchronousQueue是一個緩存值為0的阻塞隊列(也可以認為是1),它的isEmpty()方法永遠返回是true,remainingCapacity()方法永遠返回是0.
remove()和removeAll()方法永遠返回是false,iterator()方法永遠返回空,peek()方法永遠返回null.
在使用put()方法時,會一直阻塞在這里,等待被消費:
實例:
public class T09_Synchronized {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> strings = new SynchronousQueue<String>();new Thread(()->{try {System.out.println(strings.take());} catch (Exception e) {e.printStackTrace();}}).start();strings.put("aaa"); //阻塞等待消費者消費//strings.add("aaa");System.out.println(strings.size());} }參考資料
https://blog.csdn.net/qq_34707744/article/details/79746622 https://blog.csdn.net/wang7807564/article/details/80048576 《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的高并发之并发容器详解(从入门到超神)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SpringBoot 使用Swagger
- 下一篇: numpy学习(2):数组创建方式