JDK线程池CompletionService的使用
最近使用多線程優化了一個非常耗時的ping任務,下面的是未優化的源代碼,大致就是遍歷es取出的list,然后循環判斷是否能ping通:
SearchResponse searchResponse = client.search(searchRequest); Iterator it = searchResponse.getHits().iterator(); while (it.hasNext()) {boolean isReachAble = false;SearchHit hit = (SearchHit) it.next();Link link = new Link();Map<String, Object> map = hit.getSourceAsMap();String area = (String) map.get("area");if ("上海".equals(area)) {//過濾掉上海地區continue;}List<String> getways = (List<String>) map.get("gateway");//一旦能連通其中任一網關則代表連接成功for (String getway : getways) {//這個isIpReachable很耗時if (isIpReachable(getway)) {isReachAble = true;break;}}link.setArea(area);link.setIsLink(isReachAble ? 1 : 0);links.add(link); }多線程的話現在基本都是直接使用線程池了吧,如下面第一行代碼就能創建一個線程數為4的線程池:
ExecutorService executorService = Executors.newFixedThreadPool(4); CompletionService<String> pool = new ExecutorCompletionService<String>(executorService);第二行的這個CompletionService是我們今天介紹的重點,它與默認的ExecutorService的最大區別就是:
通過executorService來submit的task不一定是按照加入自己維護的list順序完成的;從list中遍歷的每個Future對象并不一定處于完成狀態,這時調用get()方法就會被阻塞住,如果系統是設計成每個線程完成后就能根據其結果繼續做后面的事,這樣對于處于list后面的但是先完成的線程就會增加了額外的等待時間。
而CompletionService的實現是維護一個保存Future對象的BlockingQueue。只有當這個Future對象狀態是結束的時候,才會加入到這個Queue中,take()方法其實就是Producer-Consumer中的Consumer。它會從Queue中取出Future對象,如果Queue是空的,就會阻塞在那里,直到有完成的Future對象加入到Queue中。
所以,先完成的必定先被取出。這樣就減少了不必要的等待時間
好了廢話不多說,直接上優化后的代碼:
//response中包含從es中取到的數據 SearchResponse searchResponse = client.search(searchRequest); Iterator it = searchResponse.getHits().iterator();//創建固定數目線程的線程池 ExecutorService executorService = Executors.newFixedThreadPool(4); CompletionService<String> pool = new ExecutorCompletionService<String>(executorService); List<Future<String>> resultList = new ArrayList<>(); while (it.hasNext()) {SearchHit hit = (SearchHit) it.next();Map<String, Object> map = hit.getSourceAsMap();List<String> getways = (List<String>) map.get("gateway");String area = (String) map.get("area");if ("上海".equals(area)) {//過濾掉上海地區continue;}//將耗時任務submit到線程池中resultList.add(pool.submit(() -> {long t1 = System.currentTimeMillis();boolean isReachAble = false;Link link = new Link();//一旦能連通其中任一網關則代表連接成功for (String getway : getways) {if (isIpReachable(getway)) {isReachAble = true;break;}}link.setArea((String) map.get("area"));link.setIsLink(isReachAble ? 1 : 0);links.add(link);long t2 = System.currentTimeMillis();return "task " + map.get("area") + " completed.耗時:" + (t2 - t1);})); } //如果沒有下面的代碼,主線程將直接返回 for(int i = 0; i < resultList.size(); i++){//在取到數據之前將會一直阻塞String result = pool.take().get();System.out.println(result); }我特地將單線程和多線程運行結果做了個對比,可以看到多線程優化過后時間減了一半之多
下面的是控制臺輸出,發現執行順序確實基本是按時間由短到長,正好體現出了CompletionService的優點
總結
以上是生活随笔為你收集整理的JDK线程池CompletionService的使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【时间序列】最完整的时间序列分析和预测(
- 下一篇: 【CV】语义分割:最简单的代码实现!