日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

高并发之并发容器详解(从入门到超神)

發布時間:2025/3/20 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 高并发之并发容器详解(从入门到超神) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、ConcurrentHashMap

在上面已經提到過ConcurrentHashMap,ConcurrentHashMap相比Hashtable能夠進一步提高并發性,其原理圖如下:

HashMap,Hashtable與ConcurrentHashMap都是實現的哈希表數據結構,在隨機讀取的時候效率很高。Hashtable實現同步是利用synchronized關鍵字進行鎖定的,其是針對整張哈希表進行鎖定的,即每次鎖住整張表讓線程獨占,在線程安全的背后是巨大的浪費。ConcurrentHashMap和Hashtable主要區別就是圍繞著鎖的粒度進行區別以及如何區鎖定。

上圖中,左邊是Hashtable的實現方式,可以看到鎖住整個哈希表;而右邊則是ConcurrentHashMap的實現方式,單獨鎖住每一個桶(segment).ConcurrentHashMap將哈希表分為16個桶(默認值),諸如get(),put(),remove()等常用操作只鎖當前需要用到的桶,而size()才鎖定整張表。原來只能一個線程進入,現在卻能同時接受16個寫線程并發進入(寫線程需要鎖定,而讀線程幾乎不受限制),并發性的提升是顯而易見的。

而在迭代時,ConcurrentHashMap使用了不同于傳統集合的快速失敗迭代器(fast-fail iterator)的另一種迭代方式,稱為弱一致迭代器。在這種迭代方式中,當iterator被創建后集合再發生改變就不再是拋出ConcurrentModificationException,取而代之的是在改變時實例化出新的數據從而不影響原有的數據,iterator完成后再將頭指針替換為新的數據,這樣iterator線程可以使用原來老的數據,而寫線程也可以并發的完成改變,更重要的,這保證了多個線程并發執行的連續性和擴展性,是性能提升的關鍵。

我們在上面闡述了ConcurrentHashMap的使用特點和原理,分別在同樣的一個高并發場景下,測試不同的方式產生的延時(ms):

Map<String, String> map = new ConcurrentHashMap<>();//483 Map<String, String> map = new ConcurrentSkipListMap<>(); //高并發并且排序 559 Map<String, String> map = new Hashtable<>(); //499 Map<String, String> map =Collections.synchronizedMap(new HashMap<>()); // 530 Map<String, String> map =Collections.synchronizedMap(new TreeMap()); //905

以ConcurrentLinkedQueue為例,他實現了Queue接口,實例化方式如下:

Queue<String> strs = new ConcurrentLinkedQueue<>();

添加元素的方法:offer()
取出隊頭的方法:poll()
判斷隊列長度:size()
對于雙端隊列,使用ConcurrentLinkedDeque類型來實現.

下面我們再看一個具體的實例:

public class T01_ConcurrentMap {public static void main(String[] args) {Map<String, String> map = new ConcurrentHashMap<String, String>();//Map<String, String> map = new ConcurrentSkipListMap<String, String>(); //高并發并且排序//Map<String, String> map = new Hashtable<>();//Map<String, String> map = new HashMap<String, String>();Random random = new Random();Thread[] threads = new Thread[100];CountDownLatch latch = new CountDownLatch(threads.length);long start = System.currentTimeMillis();for (int i = 0; i < threads.length; i++) {threads[i] = new Thread(()->{for(int j=0; j<10000;j++) map.put("a" + random.nextInt(100000), "a" + random.nextInt(100000));latch.countDown();});}Arrays.asList(threads).forEach(t->t.start());try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}long end = System.currentTimeMillis();System.out.println(end-start);} }

啟動100個線程,向圖中添加100000個元素,分別使用Hashtable,HashMap,ConcurrentHashMap,ConcurrentSkipListMap定義map,判斷程序完成的時間。最終發現,ConcurrentHashMap要比HashMap效率高,ConcurrentHashMap是將大鎖分成若干小鎖,實現多個線程共同運行,所以,效率有很大差距。ConcurrentSkipListMap較ConcurrentHashMap除了實現高并發外還能夠排序。

參考:

http://blog.csdn.net/sunxianghuang/article/details/52221913 http://www.educity.cn/java/498061.html

###二、ConcurrentQueue

與ConcurrentHashMap相同,ConcurrentQueue也是通過同樣的方式來提高并發性能的。

我們在同步容器中提到過火車票問題:

有N張火車票,每張票都有一個編號,同時有10個窗口對外售票,寫一個模擬程序。

在上述問題中,也可以使用ConcurrentQueue進一步提高并發性:

static Queue<String> tickets = new ConcurrentLinkedQueue<>();

具體的代碼是這樣的:

public class TicketSeller4 {static Queue<String> tickets = new ConcurrentLinkedQueue<>();static {for(int i=0; i<1000; i++) tickets.add("票編號:" + i);}public static void main(String[] args) {for(int i=0; i<10; i++) {new Thread(()->{while(true) {String s = tickets.poll();if(s == null) break;else System.out.println("銷售了--" + s);}}).start();}} }

這里面通過ConcurrentLinkedQueue的poll()方法來實現獲取容器成員的。用這個類型可以進一步提高并發性。

具體基本操作實例

public class T04_ConcurrentQueue {public static void main(String[] args) {Queue<String> strings = new ConcurrentLinkedQueue<String>();for (int i = 0; i < 10; i++) {strings.offer("a" + i); //相當于add, 放進隊列}System.out.println(strings);System.out.println(strings.size());System.out.println(strings.poll()); //取出并移除掉System.out.println(strings.size());System.out.println(strings.peek()); //取出,不會移除。相當于get(0)System.out.println(strings.size());} }

三、CopyOnWriteArrayList

寫時復制容器,即copy-on-write,在多線程環境下,寫時效率低,讀時效率高,適合寫少讀多的環境。對比測試幾種情況:

List<String> lists = new ArrayList<>(); //這個會出并發問題!報錯:ArrayIndexOutOfBoundsException List<String> lists = new Vector();//111 ms List<String> lists = new CopyOnWriteArrayList<>();//5230 ms //測試核心代碼: Runnable task = new Runnable() { @Override public void run() {for(int i=0; i<1000; i++) lists.add("a" + r.nextInt(10000)); } }; //多線程向該容器中不斷加入數據。

從JDK 5開始Java并發包里提供了兩個使用CopyOnWrite機制實現的并發容器,它們是CopyOnWriteArrayList和CopyOnWriteArraySet。

當我們往一個容器添加元素的時候,不直接往當前容器添加,而是先將當前容器進行Copy,復制出一個新的容器,然后向新的容器里添加元素,添加完元素之后,再將原容器的引用指向新的容器。這樣做的好處是我們可以對CopyOnWrite容器進行并發的讀,而不需要加鎖,因為在當前讀的容器中不會添加任何元素。所以CopyOnWrite容器是一種讀寫分離的思想,讀和寫對應不同的容器。

四、BlockingQueue

這種并發容器,會自動實現阻塞式的生產者/消費者模式。使用隊列解耦合,在實現異步事物的時候很有用。下面的例子,實現了阻塞隊列:

LinkedBlockingQueue static BlockingQueue<String> strs = new LinkedBlockingQueue<>(10); strs.put("a" + i); //加入隊列,如果滿了,就會等待 strs.take(); //取出隊列元素,如果空了,就會等待

在實例化時,可以指定具體的隊列容量。
在加入成員的時候,除了使用put方法還可以使用其他方法:

Str.add(“aaa”); /* add如果在隊列滿了之后,再加入成員會拋出異常,而這種情況下,put方法會一直等待被消費掉。 */ Str.offer(“aaa”); /* offer添加成員的時候,會有boolean類型的返回值,如果添加成功,會返回true,如果添加失敗,會返回false.除此之外,offer還可以按時段進行添加,例如: */ strs.offer("aaa", 1, TimeUnit.SECONDS); /* 如果隊列滿了,等待1秒,再進行成員的添加,如果添加失敗了,則返回false. */

五、ArrayBlockingQueue

static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10);

對象的方法和上面的BlockingQueue是一樣的,用法也是一樣的。

二者的區別主要是:

  • LinkedBlockingQueue是一個單向鏈表實現的阻塞隊列,在鏈表一頭加入元素,如果隊列滿,就會阻塞,另一頭取出元素,如果隊列為空,就會阻塞。
  • LinkedBlockingQueue內部使用ReentrantLock實現插入鎖(putLock)和取出鎖(takeLock)。
  • 相比于數組實現的ArrayBlockingQueue的有界情況,我們稱之為有界隊列,LinkedBlockingQueue可認為是無界隊列。當然,也可以向上面那樣指定隊列容量,但是這個參數常常是省略的,多用于任務隊列。

    六、LinkedBlockingQueue實例

    public class T05_LinkedBlockingQueue {private static BlockingQueue<String> strings = new LinkedBlockingQueue<String>();private static Random r = new Random();public static void main(String[] args) {new Thread(()->{for (int i = 0; i < 100; i++) {try {strings.put("a" + i); //如果滿了,就會等待TimeUnit.SECONDS.sleep(r.nextInt(10));} catch (Exception e) {e.printStackTrace();}}}, "p1").start();for (int i = 0; i < 5; i++) {new Thread(()->{for(;;){try {System.out.println(Thread.currentThread().getName() + "take -" + strings.take()); //如果空了,就會等待} catch (Exception e) {e.printStackTrace();} }},"c" + i).start();}} }

    LinkedBlockingQueue是使用鏈表是實現的阻塞式容器。

    七、DelayQueue

    DelayQueue也是一個BlockingQueue,其特化的參數是Delayed。
    Delayed擴展了Comparable接口,比較的基準為延時的時間值,Delayed接口的實現類getDelay()的返回值應為固定值(final).DelayQueue內部是使用PriorityQueue實現的,即:

    DelayQueue = BlockingQueue + PriorityQueue + Delayed

    可以說,DelayQueue是一個使用優先隊列(PriorityQueue)實現的BlockingQueue,優先隊列的比較基準值是時間。這是一個無界的BlockingQueue,用于放置實現了Delayed接口的對象,其中的對象只能在其到期時才能從隊列中取走。這種隊列是有序的,即隊頭對象的延遲到期時間最長。但是要注意的是,不能將null元素放置到這種隊列中。
    Delayed,一種混合風格的接口,用來標記那些應該在給定延遲時間之后執行的對象。此接口的實現類必須重寫一個 compareTo() 方法,該方法提供與此接口的 getDelay()方法一致的排序。
    DelayQueue存儲的對象是實現了Delayed接口的對象,在這個對象中,需要重寫compareTo()和getDelay()方法,例如:

    static class MyTask implements Delayed {long runningTime;MyTask(long rt) {this.runningTime = rt; } @Override public int compareTo(Delayed o) {if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))return -1;else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))return 1; else return 0; } @Override public long getDelay(TimeUnit unit) {return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } }

    因此,當我們在main()函數中,向該隊列加入元素后再取出元素的過程,就會存在延時,可以這樣驗證:

    long now = System.currentTimeMillis(); MyTask t1 = new MyTask(now + 1000); MyTask t2 = new MyTask(now + 2000); MyTask t3 = new MyTask(now + 1500); MyTask t4 = new MyTask(now + 2500); MyTask t5 = new MyTask(now + 500); tasks.put(t1); tasks.put(t2); tasks.put(t3); tasks.put(t4); tasks.put(t5); System.out.println(tasks); for(int i=0; i<5; i++) { System.out.println(tasks.take()); }

    注意:為了方便查看到效果,可以重寫toString()函數,來保證打印出來的結果有意義:
    例如:

    @Override public String toString() { return "" + runningTime; }

    DelayQueue可以用在諸如用監控線程來輪詢是否有超時任務出現,來處理某些具有等待時延的情況,這樣,可以避免由于數量巨大造成的輪詢效率差的問題。例如:

  • 關閉空閑連接:服務器中,有很多客戶端的連接,空閑一段時間之后需要關閉他們。
  • 緩存:緩存中的對象,超過了空閑時間,需要從緩存中移出。
  • 任務超時處理:在網絡協議滑動窗口請求應答式交互時,處理超時未響應的請求。
  • 實例:

    public class T07_DelayQueue {private static BlockingQueue<MyTask> tasks = new DelayQueue<>();private static Random r = new Random();static class MyTask implements Delayed{long runningTime;public MyTask(long rt) {this.runningTime = rt;}@Overridepublic int compareTo(Delayed o) {if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MICROSECONDS)) {return -1;}else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {return 1;}else {return 0;}}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic String toString() {return "" + runningTime;}public static void main(String[] args) throws InterruptedException {long now = System.currentTimeMillis();MyTask t1 = new MyTask(now + 1000);MyTask t2 = new MyTask(now + 2000);MyTask t3 = new MyTask(now + 1500);MyTask t4 = new MyTask(now + 2500);MyTask t5 = new MyTask(now + 500);tasks.put(t1);tasks.put(t2);tasks.put(t3);tasks.put(t4);tasks.put(t5);System.out.println(tasks);for (int i = 0; i < 5; i++) {System.out.println(tasks.take());}}} }

    八、LinkedTransferQueue

    TransferQueue是一個繼承了BlockingQueue的接口,并且增加若干新的方法。LinkedTransferQueue是TransferQueue接口的實現類,其定義為一個無界的隊列,具有先進先出(FIFO)的特性。
    TransferQueue接口含有下面幾個重要方法:

  • transfer(E e)
    若當前存在一個正在等待獲取的消費者線程,即立刻移交之;否則,會插入當前元素e到隊列尾部,并且等待進入阻塞狀態,到有消費者線程取走該元素。

  • tryTransfer(E e)
    若當前存在一個正在等待獲取的消費者線程(使用take()或者poll()函數),使用該方法會即刻轉移/傳輸對象元素e;若不存在,則返回false,并且不進入隊列。這是一個不阻塞的操作。

  • tryTransfer(E e,long timeout,TimeUnit unit)
    若當前存在一個正在等待獲取的消費者線程,會立即傳輸給它;否則將插入元素e到隊列尾部,并且等待被消費者線程獲取消費掉;若在指定的時間內元素e無法被消費者線程獲取,則返回false,同時該元素被移除。

  • hasWaitingConsumer()
    判斷是否存在消費者線程。

  • getWaitingConsumerCount()
    獲取所有等待獲取元素的消費線程數量。

  • size()
    因為隊列的異步特性,檢測當前隊列的元素個數需要逐一迭代,無法保證原子性,可能會得到一個不太準確的結果,尤其是在遍歷時有可能隊列發生更改。

  • 使用方法:

    LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();//實例化

    如果當前沒有消費者線程(存在take方法的線程):

    strs.transfer("aaa");

    該方法會一直阻塞在這里,知道有消費者線程存在。
    而如果使用傳統的put()方法來加入元素的話,則不會發生阻塞現象。

    strs.take()

    同樣,獲取隊列中元素的方法take()也是阻塞在這里等待獲取新的元素的。

    九、SynchronousQueue

    SynchronousQueue也是一種BlockingQueue,是一種無緩沖的等待隊列。所以,在某次添加元素后必須等待其他線程取走后才能繼續添加;可以認為SynchronousQueue是一個緩存值為0的阻塞隊列(也可以認為是1),它的isEmpty()方法永遠返回是true,remainingCapacity()方法永遠返回是0.
    remove()和removeAll()方法永遠返回是false,iterator()方法永遠返回空,peek()方法永遠返回null.
    在使用put()方法時,會一直阻塞在這里,等待被消費:

    BlockingQueue strs = new SynchronousQueue<>();//實例化 strs.put(“aaa”); //阻塞等待消費者消費 strs.add(“aaa”);//會產生異常,提示隊列滿了 strs.take();//該方法可以取出元素,同樣是阻塞的,需要在線程中去實現他,作為消費者.

    實例:

    public class T09_Synchronized {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> strings = new SynchronousQueue<String>();new Thread(()->{try {System.out.println(strings.take());} catch (Exception e) {e.printStackTrace();}}).start();strings.put("aaa"); //阻塞等待消費者消費//strings.add("aaa");System.out.println(strings.size());} }
    參考資料
    https://blog.csdn.net/qq_34707744/article/details/79746622 https://blog.csdn.net/wang7807564/article/details/80048576 《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀

    總結

    以上是生活随笔為你收集整理的高并发之并发容器详解(从入门到超神)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。