生活随笔
收集整理的這篇文章主要介紹了
线程同步工具(七)在并发任务间交换数据
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
聲明:本文是《 Java 7 Concurrency Cookbook 》的第三章, 作者: Javier Fernández González 譯者:鄭玉婷
在并發任務間交換數據
Java 并發 API 提供了一種允許2個并發任務間相互交換數據的同步應用。更具體的說,Exchanger 類允許在2個線程間定義同步點,當2個線程到達這個點,他們相互交換數據類型,使用第一個線程的數據類型變成第二個的,然后第二個線程的數據類型變成第一個的。
這個類在遇到類似生產者和消費者問題時,是非常有用的。來一個非常經典的并發問題:你有相同的數據buffer,一個或多個數據生產者,和一個或多個數據消費者。只是Exchange類只能同步2個線程,所以你只能在你的生產者和消費者問題中只有一個生產者和一個消費者時使用這個類。
在這個指南,你將學習如何使用 Exchanger 類來解決只有一個生產者和一個消費者的生產者和消費者問題。
準備
這個指南的例子使用Eclipse IDE實現。如果你使用Eclipse或其他IDE,如NetBeans,打開它并創建一個新的Java項目。
怎么做呢…
按照這些步驟來實現下面的例子:
| 02 | import java.util.List; |
| 03 | import java.util.concurrent.Exchanger; |
| 05 | //1. 首先,從實現producer開始吧。創建一個類名為Producer并一定實現 Runnable 接口。 |
| 06 | public class Producer implements Runnable { |
| 08 | // 2. 聲明 List<String>對象,名為 buffer。這是等等要被相互交換的數據類型。 |
| 09 | private List<String> buffer; |
| 11 | // 3. 聲明 Exchanger<List<String>>; 對象,名為exchanger。這個 exchanger 對象是用來同步producer和consumer的。 |
| 12 | private final Exchanger<List<String>> exchanger; |
| 14 | // 4. 實現類的構造函數,初始化這2個屬性。 |
| 15 | public Producer(List<String> buffer, Exchanger<List<String>> exchanger) { |
| 17 | this.exchanger = exchanger; |
| 20 | // 5. 實現 run() 方法. 在方法內,實現10次交換。 |
| 24 | for (int i = 0; i < 10; i++) {?????????? System.out.printf("Producer: Cycle %d\n", cycle); |
| 26 | // 6. 在每次循環中,加10個字符串到buffer。 |
| 27 | for (int j = 0; j <10; j++) { |
| 28 | String message = "Event " + ((i * 10) + j); |
| 29 | System.out.printf("Producer: %s\n", message); |
| 33 | // 7. 調用 exchange() 方法來與consumer交換數據。此方法可能會拋出InterruptedException 異常, 加上處理代碼。 |
| 35 | buffer = exchanger.exchange(buffer); |
| 36 | } catch (InterruptedException e) { |
| 39 | System.out.println("Producer: " + buffer.size()); |
| 01 | //8. 現在, 來實現consumer。創建一個類名為Consumer并一定實現 Runnable 接口。 |
| 03 | import java.util.List; |
| 04 | import java.util.concurrent.Exchanger; |
| 05 | public class Consumer implements Runnable { |
| 07 | // 9. 聲明名為buffer的 List<String>對象。這個對象類型是用來相互交換的。 |
| 08 | private List<String> buffer; |
| 10 | // 10. 聲明一個名為exchanger的 Exchanger<List<String>> 對象。用來同步 producer和consumer。 |
| 11 | private final Exchanger<List<String>> exchanger; |
| 13 | // 11. 實現類的構造函數,并初始化2個屬性。 |
| 14 | public Consumer(List<String>buffer, Exchanger<List<String>> exchanger) { |
| 16 | this.exchanger = exchanger; |
| 19 | // 12. 實現 run() 方法。在方法內,實現10次交換。 |
| 23 | for (int i = 0; i < 10; i++) { |
| 24 | System.out.printf("Consumer: Cycle %d\n", cycle); |
| 26 | // 13. 在每次循環,首先調用exchange()方法來與producer同步。Consumer需要消耗數據。此方法可能會拋出InterruptedException異常, 加上處理代碼。 |
| 28 | buffer = exchanger.exchange(buffer); |
| 29 | } catch (InterruptedException e) {????????????? e.printStackTrace(); |
| 32 | // 14. 把producer發來的在buffer里的10字符串寫到操控臺并從buffer內刪除,留空。System.out.println("Consumer: " + buffer.size()); |
| 33 | for (int j = 0; j <10; j++) { |
| 34 | String message = buffer.get(0); |
| 35 | System.out.println("Consumer: " + message); |
?
| 01 | //15.現在,實現例子的主類通過創建一個類,名為Core并加入 main() 方法。 |
| 03 | import java.util.ArrayList; |
| 05 | import java.util.concurrent.Exchanger; |
| 08 | public static void main(String[] args) { |
| 10 | // 16. 創建2個buffers。分別給producer和consumer使用. |
| 11 | List<String> buffer1 = new ArrayList<String>(); |
| 12 | List<String> buffer2 = new ArrayList<String>(); |
| 14 | // 17. 創建Exchanger對象,用來同步producer和consumer。 |
| 15 | Exchanger<List<String>> exchanger = new Exchanger<List<String>>(); |
| 17 | // 18. 創建Producer對象和Consumer對象。 |
| 18 | Producer producer = new Producer(buffer1, exchanger); |
| 19 | Consumer consumer = new Consumer(buffer2, exchanger); |
| 21 | // 19. 創建線程來執行producer和consumer并開始線程。 |
| 22 | Thread threadProducer = new Thread(producer); |
| 23 | Thread threadConsumer = new Thread(consumer); threadProducer.start(); |
| 24 | threadConsumer.start(); |
它是怎么工作的…
消費者開始時是空白的buffer,然后調用Exchanger來與生產者同步。因為它需要數據來消耗。生產者也是從空白的buffer開始,然后創建10個字符串,保存到buffer,并使用exchanger與消費者同步。
在這兒,2個線程(生產者和消費者線程)都是在Exchanger里并交換了數據類型,所以當消費者從exchange() 方法返回時,它有10個字符串在buffer內。當生產者從 exchange() 方法返回時,它有空白的buffer來重新寫入。這樣的操作會重復10遍。
如你執行例子,你會發現生產者和消費者是如何并發的執行任務和在每個步驟它們是如何交換buffers的。與其他同步工具一樣會發生這種情況,第一個調用 exchange()方法會進入休眠直到其他線程的達到。
更多…
Exchanger 類有另外一個版本的exchange方法:
- exchange(V data, long time, TimeUnit unit):V是聲明Phaser的參數種類(例子里是 List)。 此線程會休眠直到另一個線程到達并中斷它,或者特定的時間過去了。TimeUnit類有多種常量:DAYS, HOURS, MICROSECONDS, MILLISECONDS, MINUTES, NANOSECONDS, 和 SECONDS。
總結
以上是生活随笔為你收集整理的线程同步工具(七)在并发任务间交换数据的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。