日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

聊聊高并发(三十一)解析java.util.concurrent各个组件(十三) 理解Exchanger交换器

發布時間:2024/1/17 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 聊聊高并发(三十一)解析java.util.concurrent各个组件(十三) 理解Exchanger交换器 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

這篇講講Exchanger交互器,它是一種比較特殊的兩方(Two-Party)柵欄,可以理解成Exchanger是一個柵欄,兩邊一方是生產者,一方是消費者,

1. 生產者和消費者各自維護了一個容器,生產者往容器里生產東西,消費者從容器里消費東西。

2. 當生產者的容器是滿的時候,它需要通過Exchanger向消費者交換,把滿的容器交換給消費者,從消費者手里拿到空的容器繼續生產。

3. 當消費者的容器是空的時候,它需要通過Exchanger向生產者交換,把空的容器交換給生產者,從生產者手里拿到滿的容器繼續消費。

?

所以我們看到這個過程中至少有5個組件

1. Exchanger柵欄

2. 生產者

3. 消費者

4. 生產者的容器

5. 消費者的容器

?

更復雜的情況是生產者有多個人在生產,消費者有多個人在消費,每個人都有自己的容器。這里有一個隱含的意思是生產者和消費者不挑容器,只要是空的或者滿的都能用。Exchanger的匹配是根據Hash來的,所以可能出現不同的人生產者或消費者對應到同一個Hash值。

Exchanger使用了Slot槽來表示一個位置,生產者和消費者都可以被Hash到一個槽中。

?

?
  • private static final class Slot extends AtomicReference<Object> {

  • // Improve likelihood of isolation on <= 64 byte cache lines

  • long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;

  • }

  • ?
  • /**

  • * Slot array. Elements are lazily initialized when needed.

  • * Declared volatile to enable double-checked lazy construction.

  • */

  • private volatile Slot[] arena = new Slot[CAPACITY];


  • 創建了一個內部類Node來封裝要交互者的線程和要交換的容器

    ?

    ?

    ?
  • private static final class Node extends AtomicReference<Object> {

  • /** The element offered by the Thread creating this node. */

  • public final Object item;

  • ?
  • /** The Thread waiting to be signalled; null until waiting. */

  • public volatile Thread waiter;

  • ?
  • /**

  • * Creates node with given item and empty hole.

  • * @param item the item

  • */

  • public Node(Object item) {

  • this.item = item;

  • }

  • }

  • ?

    ?

    算法的主要部分就是交換的過程,下面簡單說說交互的邏輯

    1. 先根據當前線程的id計算出一個Hash值作為索引index

    2. 然后輪詢,如果index對應的Slot槽是null就生成一個,表示還沒有人使用這個槽位

    3.?如果對應的Slot已經有線程了,并且CAS設置它為null也成功了,表示生產者和消費者匹配上了,再通過CAS把自己的item設置給對方Node引用,然后把之前等待的一方喚醒,把對方Node里面的item返回給自己。這樣相當于后來者拿到了之前等待者的item,并把后來者自己的item設置成了之前等待者的Node引用

    當先來者被從自旋狀態喚醒后,會從自己的Node引用中獲取item,如果非空并且不是CANCEL,就證明有人跟它交換了,也拿到了對方的item返回了,否則就是超時取消了

    4. 如果對應的Slot沒有線程,說明它是先來的那個,如果是0號位置的Slot,就進行阻塞,如果是非0的Slot,就自旋,直到超時或取消

    5. 如果一個進入在它自己選擇的槽上CAS失敗,它選擇一個供替代的槽。如果一個線程成功CAS到一個槽但沒有其他線程到達,它嘗試其他,前往 0 號槽

    ?
  • private Object doExchange(Object item, boolean timed, long nanos) {

  • Node me = new Node(item); // Create in case occupying

  • int index = hashIndex(); // Index of current slot

  • int fails = 0; // Number of CAS failures

  • ?
  • for (;;) {

  • Object y; // Contents of current slot

  • Slot slot = arena[index];

  • if (slot == null) // Lazily initialize slots

  • createSlot(index); // Continue loop to reread

  • else if ((y = slot.get()) != null && // Try to fulfill

  • slot.compareAndSet(y, null)) {

  • Node you = (Node)y; // Transfer item

  • if (you.compareAndSet(null, item)) {

  • LockSupport.unpark(you.waiter);

  • return you.item;

  • } // Else cancelled; continue

  • }

  • else if (y == null && // Try to occupy

  • slot.compareAndSet(null, me)) {

  • if (index == 0) // Blocking wait for slot 0

  • return timed ?

  • awaitNanos(me, slot, nanos) :

  • await(me, slot);

  • Object v = spinWait(me, slot); // Spin wait for non-0

  • if (v != CANCEL)

  • return v;

  • me = new Node(item); // Throw away cancelled node

  • int m = max.get();

  • if (m > (index >>>= 1)) // Decrease index

  • max.compareAndSet(m, m - 1); // Maybe shrink table

  • }

  • else if (++fails > 1) { // Allow 2 fails on 1st slot

  • int m = max.get();

  • if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))

  • index = m + 1; // Grow on 3rd failed slot

  • else if (--index < 0)

  • index = m; // Circularly traverse

  • }

  • }

  • }


  • 更多Exchanger算法的細節請參考這篇?http://coderbee.net/index.php/concurrent/20140424/897

    ?

    ?

    下面用一個測試用例來測試Exchanger的功能。最簡單的一個Exchanger的使用場景有5個組件

    1個Exchanger, 1個生產者,1個生產者容器,1個消費者,1個消費者容器

    當生產者把自己的容器生產滿了,就在Exchanger柵欄處等待消費者拿空的容器和它交換

    當消費者把自己的容器消費空了,就在Exchanger柵欄處等待生產者拿滿的容器和它交換

    ?

    ?
  • package com.lock.test;

  • ?
  • import java.util.concurrent.Exchanger;

  • ?
  • public class ExchangerUsecase {

  • private static Exchanger<Buffer<Integer>> exchanger = new Exchanger<Buffer<Integer>>();

  • private static Buffer<Integer> emptyBuffer = new Buffer<Integer>();

  • private static Buffer<Integer> fullBuffer = new Buffer<Integer>();

  • ?
  • private static class Buffer<T>{

  • private T[] cache = (T[])(new Object[2]);

  • private int index = 0;

  • ?
  • public void add(T item){

  • cache[index++] = item;

  • }

  • ?
  • public T take(){

  • return cache[--index];

  • }

  • ?
  • public boolean isEmpty(){

  • return index == 0;

  • }

  • ?
  • public boolean isFull(){

  • return index == cache.length;

  • }

  • }

  • ?
  • public static void main(String[] args){

  • Runnable provider = new Runnable(){

  • Buffer<Integer> currentBuffer = emptyBuffer;

  • private int exchangeCount = 0;

  • @Override

  • public void run() {

  • while(currentBuffer != null && exchangeCount <= 1){

  • if(!currentBuffer.isFull()){

  • System.out.println("Provider added one item");

  • currentBuffer.add(1);

  • }else{

  • try {

  • currentBuffer = exchanger.exchange(currentBuffer);

  • exchangeCount ++;

  • Thread.sleep(2000);

  • } catch (InterruptedException e) {

  • e.printStackTrace();

  • }

  • }

  • }

  • ?
  • }

  • ?
  • };

  • ?
  • Runnable consumer = new Runnable(){

  • Buffer<Integer> currentBuffer = fullBuffer;

  • private int exchangeCount = 0;

  • @Override

  • public void run() {

  • while(currentBuffer != null && exchangeCount <= 2){

  • if(!currentBuffer.isEmpty()){

  • System.out.println("Consumer took one item");

  • currentBuffer.take();

  • }else{

  • try {

  • currentBuffer = exchanger.exchange(currentBuffer);

  • exchangeCount ++;

  • } catch (InterruptedException e) {

  • e.printStackTrace();

  • }

  • }

  • }

  • ?
  • }

  • ?
  • };

  • ?
  • new Thread(provider).start();

  • new Thread(consumer).start();

  • }

  • }

  • ?
  • private static Object spinWait(Node node, Slot slot) {

  • ??????? int spins = SPINS;

  • ??????? for (;;) {

  • ??????????? Object v = node.get();

  • ??????????? if (v != null)

  • ??????????????? return v;

  • ??????????? else if (spins > 0)

  • ??????????????? --spins;

  • ??????????? else

  • ??????????????? tryCancel(node, slot);

  • ??????? }

  • ??? }?


  • 測試結果顯示生產者先生成了兩個,然后滿了,就等待消費者和它交換。交換后消費者消費了兩個,再次等待交換。生產者又生成滿了一次,再次交換。如果不設置退出機制,雙方會一直生產和消費下去,所以在測試用例中限制了交換兩次

    ?

    ?

    ?
  • Provider added one item

  • Provider added one item

  • Consumer took one item

  • Consumer took one item

  • Provider added one item

  • Provider added one item

  • Consumer took one item

  • Consumer took one item

  • 總結

    以上是生活随笔為你收集整理的聊聊高并发(三十一)解析java.util.concurrent各个组件(十三) 理解Exchanger交换器的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。