日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

mysql异步查询 java_java 手写并发框架(一)异步查询转同步的 7 种实现方式

發(fā)布時間:2024/1/23 数据库 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 mysql异步查询 java_java 手写并发框架(一)异步查询转同步的 7 种实现方式 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

序言

本節(jié)將學(xué)習(xí)一下如何實現(xiàn)異步查詢轉(zhuǎn)同步的方式,共計介紹了 7 種常見的實現(xiàn)方式。

思維導(dǎo)圖如下:

異步轉(zhuǎn)同步

業(yè)務(wù)需求

有些接口查詢反饋結(jié)果是異步返回的,無法立刻獲取查詢結(jié)果。

比如業(yè)務(wù)開發(fā)中我們調(diào)用其他系統(tǒng),但是結(jié)果的返回確實通知的。

或者 rpc 實現(xiàn)中,client 調(diào)用 server 端,結(jié)果也是異步返回的,那么如何同步獲取調(diào)用結(jié)果呢?

正常處理邏輯

觸發(fā)異步操作,然后傳遞一個唯一標(biāo)識。

等到異步結(jié)果返回,根據(jù)傳入的唯一標(biāo)識,匹配此次結(jié)果。

如何轉(zhuǎn)換為同步

正常的應(yīng)用場景很多,但是有時候不想做數(shù)據(jù)存儲,只是想簡單獲取調(diào)用結(jié)果。

即想達(dá)到同步操作的結(jié)果,怎么辦呢?

思路

發(fā)起異步操作

在異步結(jié)果返回之前,一直等待(可以設(shè)置超時)

結(jié)果返回之后,異步操作結(jié)果統(tǒng)一返回

常見的實現(xiàn)方式

循環(huán)等待

wait & notify

使用條件鎖

使用 CountDownLatch

使用 CyclicBarrier

Future

Spring EventListener

下面我們一起來學(xué)習(xí)下這幾種實現(xiàn)方式。

循環(huán)等待

說明

循環(huán)等待是最簡單的一種實現(xiàn)思路。

我們調(diào)用對方一個請求,在沒有結(jié)果之前一直循環(huán)查詢即可。

這個結(jié)果可以在內(nèi)存中,也可以放在 redis 緩存或者 mysql 等數(shù)據(jù)庫中。

代碼實現(xiàn)

定義抽象父類

為了便于后面的其他幾種實現(xiàn)方式統(tǒng)一,我們首先定義一個抽象父類。

/**

* 抽象查詢父類

* @author binbin.hou

* @since 1.0.0

*/

public abstract class AbstractQuery {

private static final Log log = LogFactory.getLog(AbstractQuery.class);

protected String result;

public void asyncToSync() {

startQuery();

new Thread(new Runnable() {

public void run() {

remoteCall();

}

}).start();

endQuery();

}

protected void startQuery() {

log.info("開始查詢...");

}

/**

* 遠(yuǎn)程調(diào)用

*/

protected void remoteCall() {

try {

log.info("遠(yuǎn)程調(diào)用開始");

TimeUnit.SECONDS.sleep(5);

result = "success";

log.info("遠(yuǎn)程調(diào)用結(jié)束");

} catch (InterruptedException e) {

log.error("遠(yuǎn)程調(diào)用失敗", e);

}

}

/**

* 查詢結(jié)束

*/

protected void endQuery() {

log.info("完成查詢,結(jié)果為:" + result);

}

}

代碼實現(xiàn)

實現(xiàn)還是非常簡單的,在沒有結(jié)果之前一直循環(huán)。

TimeUnit.MILLISECONDS.sleep(10); 這里循環(huán)等待的小睡一會兒是比較重要的,避免 cpu 飆升,也可以降低為 1ms,根據(jù)自己的業(yè)務(wù)調(diào)整即可。

/**

* 循環(huán)等待

* @author binbin.hou

* @since 1.0.0

*/

public class LoopQuery extends AbstractQuery {

private static final Log log = LogFactory.getLog(LoopQuery.class);

@Override

protected void endQuery() {

try {

while (StringUtil.isEmpty(result)) {

//循環(huán)等待一下

TimeUnit.MILLISECONDS.sleep(10);

}

//獲取結(jié)果

log.info("完成查詢,結(jié)果為:" + result);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

測試

LoopQuery loopQuery = new LoopQuery();

loopQuery.asyncToSync();

日志

[INFO] [2020-10-08 09:50:43.330] [main] [c.g.h.s.t.d.AbstractQuery.startQuery] - 開始查詢...

[INFO] [2020-10-08 09:50:43.331] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用開始

[INFO] [2020-10-08 09:50:48.334] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用結(jié)束

[INFO] [2020-10-08 09:50:48.343] [main] [c.g.h.s.t.d.LoopQuery.endQuery] - 完成查詢,結(jié)果為:success

這里可以看到遠(yuǎn)程調(diào)用是 Thread-0 線程執(zhí)行的,遠(yuǎn)程調(diào)用的耗時為 5S。

超時特性

為什么需要超時時間

上面的實現(xiàn)存在一個問題,那就是循環(huán)等待沒有超時時間。

我們的一個網(wǎng)絡(luò)請求,可能存在失敗,也可能對方收到請求之后沒有正確處理。

所以如果我們一直等待,可能永遠(yuǎn)也沒有結(jié)果,或者很久之后才有結(jié)果。這在業(yè)務(wù)上是不可忍受的,所以需要添加一個超時時間。

代碼實現(xiàn)

/**

* 循環(huán)等待-包含超時時間

* @author binbin.hou

* @since 1.0.0

*/

public class LoopTimeoutQuery extends AbstractQuery {

private static final Log log = LogFactory.getLog(LoopTimeoutQuery.class);

/**

* 超時時間

*/

private long timeoutMills = 3000;

public LoopTimeoutQuery() {

}

public LoopTimeoutQuery(long timeoutMills) {

this.timeoutMills = timeoutMills;

}

@Override

protected void endQuery() {

try {

final long endTimeMills = System.currentTimeMillis() + timeoutMills;

while (StringUtil.isEmpty(result)) {

// 超時判斷

if(System.currentTimeMillis() >= endTimeMills) {

throw new RuntimeException("請求超時");

}

//循環(huán)等待一下

TimeUnit.MILLISECONDS.sleep(10);

}

//獲取結(jié)果

log.info("完成查詢,結(jié)果為:" + result);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

測試

LoopTimeoutQuery loopQuery = new LoopTimeoutQuery();

loopQuery.asyncToSync();

日志如下:

[INFO] [2020-10-08 10:04:58.091] [main] [c.g.h.s.t.d.AbstractQuery.startQuery] - 開始查詢...

[INFO] [2020-10-08 10:04:58.092] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用開始

Exception in thread "main" java.lang.RuntimeException: 請求超時

at com.github.houbb.sync.test.demo.LoopTimeoutQuery.endQuery(LoopTimeoutQuery.java:38)

at com.github.houbb.sync.test.demo.AbstractQuery.asyncToSync(AbstractQuery.java:26)

at com.github.houbb.sync.test.demo.LoopTimeoutQuery.main(LoopTimeoutQuery.java:55)

[INFO] [2020-10-08 10:05:03.097] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用結(jié)束

超時時間是可以設(shè)定的,平時開發(fā)中可以根據(jù)自己的響應(yīng)時間設(shè)置。

如果請求超時,考慮對應(yīng)的兜底方案。

基于 wait() & notifyAll()

簡介

實際上 loop 循環(huán)還是比較消耗性能的,對于這種等待特性, jdk 實際上為我們封裝了多種特性。

比如最常見的 wait() 進(jìn)入等待,notifyAll() 喚醒等待的組合方式。

這個同時也是阻塞隊列的實現(xiàn)思想,阻塞隊列我們就不介紹了,我們來看一下 wait+notify 的實現(xiàn)方式。

java 實現(xiàn)

package com.github.houbb.sync.test.demo;

import com.github.houbb.log.integration.core.Log;

import com.github.houbb.log.integration.core.LogFactory;

/**

* wait+notify 實現(xiàn)

* @author binbin.hou

* @since 1.0.0

*/

public class WaitNotifyQuery extends AbstractQuery {

private static final Log log = LogFactory.getLog(WaitNotifyQuery.class);

/**

* 聲明對象

*/

private final Object lock = new Object();

@Override

protected void remoteCall() {

super.remoteCall();

synchronized (lock) {

log.info("遠(yuǎn)程線程執(zhí)行完成,喚醒所有等待。");

lock.notifyAll();

}

}

@Override

protected void endQuery() {

try {

// 等待 10s

synchronized (lock) {

log.info("主線程進(jìn)入等待");

lock.wait(10 * 1000);

}

} catch (InterruptedException e) {

e.printStackTrace();

}

super.endQuery();

}

public static void main(String[] args) {

WaitNotifyQuery query = new WaitNotifyQuery();

query.asyncToSync();

}

}

注意:編程時需要使用 synchronized 保證鎖的持有者線程安全,不然會報錯。

測試

日志如下:

[INFO] [2020-10-08 11:05:50.769] [main] [c.g.h.s.t.d.AbstractQuery.startQuery] - 開始查詢...

[INFO] [2020-10-08 11:05:50.770] [main] [c.g.h.s.t.d.WaitNotifyQuery.endQuery] - 主線程進(jìn)入等待

[INFO] [2020-10-08 11:05:50.770] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用開始

[INFO] [2020-10-08 11:05:55.772] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用結(jié)束

[INFO] [2020-10-08 11:05:55.773] [Thread-0] [c.g.h.s.t.d.WaitNotifyQuery.remoteCall] - 遠(yuǎn)程線程執(zhí)行完成,喚醒所有等待。

[INFO] [2020-10-08 11:05:55.773] [main] [c.g.h.s.t.d.AbstractQuery.endQuery] - 完成查詢,結(jié)果為:success

基于條件鎖的實現(xiàn)

條件鎖簡介

如果你想編寫一個含有多個條件謂詞的并發(fā)對象,或者你想獲得比條件隊列的可見性之外更多的控制權(quán),那么顯式的Lock和Condition的實現(xiàn)類提供了一個比內(nèi)部鎖和條件隊列更加靈活的選擇。

如同Lock提供了比內(nèi)部加鎖要豐富得多的特征集一樣,Condition也提供了比內(nèi)部條件隊列要豐富得多的特征集:

每個鎖可以有多個等待集(因await掛起的線程的集合)、可中斷/不可中斷的條件等待、基于時限的等待以及公平/非公平隊列之間的選擇.

注意事項:

wait、notify和notifyAll在Condition對象中的對等體是await、signal和signalAll.

但是,Condition繼承與Object,這意味著它也有wait和notify方法.

一定要確保使用了正確的版本–await和signal!

java 實現(xiàn)

為了演示簡單,我們直接選擇可重入鎖即可。

一個Condition和一個單獨的Lock相關(guān)聯(lián),就像條件隊列和單獨的內(nèi)部鎖相關(guān)聯(lián)一樣;

調(diào)用與Condition相關(guān)聯(lián)的Lock的Lock.newCondition方法,可以創(chuàng)建一個Condition.

package com.github.houbb.sync.test.demo;

import com.github.houbb.log.integration.core.Log;

import com.github.houbb.log.integration.core.LogFactory;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

/**

* 條件鎖實現(xiàn)

* @author binbin.hou

* @since 1.0.0

*/

public class LockConditionQuery extends AbstractQuery {

private static final Log log = LogFactory.getLog(LockConditionQuery.class);

private final Lock lock = new ReentrantLock();

private final Condition condition = lock.newCondition();

@Override

protected void remoteCall() {

lock.lock();

try{

super.remoteCall();

log.info("遠(yuǎn)程線程執(zhí)行完成,喚醒所有等待線程。");

condition.signalAll();

} finally {

lock.unlock();

}

}

@Override

protected void endQuery() {

lock.lock();

try{

// 等待

log.info("主線程進(jìn)入等待");

condition.await();

super.endQuery();

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

lock.unlock();

}

}

public static void main(String[] args) {

LockConditionQuery query = new LockConditionQuery();

query.asyncToSync();

}

}

實現(xiàn)也比較簡單,我們在方法進(jìn)入,調(diào)用 lock.lock() 加鎖,finally 中調(diào)用 lock.unlock() 釋放鎖。

condition.await(); 進(jìn)入等待;condition.signalAll(); 喚醒所有等待線程。

測試日志

[INFO] [2020-10-08 12:33:40.985] [main] [c.g.h.s.t.d.AbstractQuery.startQuery] - 開始查詢...

[INFO] [2020-10-08 12:33:40.986] [main] [c.g.h.s.t.d.LockConditionQuery.endQuery] - 主線程進(jìn)入等待

[INFO] [2020-10-08 12:33:40.987] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用開始

[INFO] [2020-10-08 12:33:45.990] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用結(jié)束

[INFO] [2020-10-08 12:33:45.991] [Thread-0] [c.g.h.s.t.d.LockConditionQuery.remoteCall] - 遠(yuǎn)程線程執(zhí)行完成,喚醒所有等待線程。

[INFO] [2020-10-08 12:33:45.993] [main] [c.g.h.s.t.d.AbstractQuery.endQuery] - 完成查詢,結(jié)果為:success

CountDownLatch 閉鎖實現(xiàn)

CountDownLatch/Future/CyclicBarrier 這三個都是 jdk 為我們提供的同步工具類,我們此處只做簡單介紹。

詳情參見:

CountDownLatch 簡介

閉鎖是一種同步工具類,可以延遲線程的進(jìn)度直到其達(dá)到終止?fàn)顟B(tài)。

閉鎖的作用相當(dāng)于一扇門:在閉鎖到達(dá)結(jié)束狀態(tài)之前,這扇門一直是關(guān)閉的,并且沒有任何線程能通過,當(dāng)?shù)竭_(dá)結(jié)束狀態(tài)時,這扇門會打開并允許所有的線程通過。

當(dāng)閉鎖到達(dá)結(jié)束狀態(tài)后,將不會再改變狀態(tài),因此這扇門將永遠(yuǎn)保持打開狀態(tài)。

閉鎖可以用來確保某些活動直到其它活動都完成后才繼續(xù)執(zhí)行。

java 代碼實現(xiàn)

import com.github.houbb.log.integration.core.Log;

import com.github.houbb.log.integration.core.LogFactory;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.TimeUnit;

/**

* CountDownLatch 實現(xiàn)

* @author binbin.hou

* @since 1.0.0

*/

public class CountDownLatchQuery extends AbstractQuery {

private static final Log log = LogFactory.getLog(CountDownLatchQuery.class);

/**

* 閉鎖

* 調(diào)用1次,后續(xù)方法即可通行。

*/

private final CountDownLatch countDownLatch = new CountDownLatch(1);

@Override

protected void remoteCall() {

super.remoteCall();

// 調(diào)用一次閉鎖

countDownLatch.countDown();

}

@Override

protected void endQuery() {

try {

// countDownLatch.await();

countDownLatch.await(10, TimeUnit.SECONDS);

log.info("完成查詢,結(jié)果為:" + result);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

public static void main(String[] args) {

CountDownLatchQuery loopQuery = new CountDownLatchQuery();

loopQuery.asyncToSync();

}

}

我們在返回結(jié)果之前調(diào)用 countDownLatch.await(10, TimeUnit.SECONDS); 進(jìn)行等待,這里可以指定超時時間。

remoteCall() 遠(yuǎn)程完成后,執(zhí)行一下 countDownLatch.countDown();,進(jìn)而可以讓程序繼續(xù)執(zhí)行下去。

測試

代碼

CountDownLatchQuery loopQuery = new CountDownLatchQuery();

loopQuery.asyncToSync();

日志

[INFO] [2020-10-08 10:24:03.348] [main] [c.g.h.s.t.d.AbstractQuery.startQuery] - 開始查詢...

[INFO] [2020-10-08 10:24:03.350] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用開始

[INFO] [2020-10-08 10:24:08.353] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用結(jié)束

[INFO] [2020-10-08 10:24:08.354] [main] [c.g.h.s.t.d.CountDownLatchQuery.endQuery] - 完成查詢,結(jié)果為:success

jdk 提供的閉鎖功能還是非常的方便的。

CyclicBarrier 柵欄

簡介

柵欄(Barrier)類似于閉鎖,它能阻塞一組線程直到某個事件發(fā)生[CPJ 4.4.3]。閉鎖是一次性對象,一旦進(jìn)入最終狀態(tài),就不能被重置了。

柵欄與閉鎖的關(guān)鍵區(qū)別在于,所有線程必須同時達(dá)到柵欄位置,才能繼續(xù)執(zhí)行。閉鎖用于等待事件,而柵欄用于等待其他線程。

java 實現(xiàn)

package com.github.houbb.sync.test.demo;

import com.github.houbb.log.integration.core.Log;

import com.github.houbb.log.integration.core.LogFactory;

import java.util.concurrent.BrokenBarrierException;

import java.util.concurrent.CyclicBarrier;

/**

* CyclicBarrier 實現(xiàn)

* @author binbin.hou

* @since 1.0.0

*/

public class CyclicBarrierQuery extends AbstractQuery {

private static final Log log = LogFactory.getLog(CyclicBarrierQuery.class);

private CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

@Override

protected void remoteCall() {

super.remoteCall();

try {

cyclicBarrier.await();

log.info("遠(yuǎn)程調(diào)用進(jìn)入等待");

} catch (InterruptedException | BrokenBarrierException e) {

e.printStackTrace();

}

}

@Override

protected void endQuery() {

try {

cyclicBarrier.await();

log.info("主線程進(jìn)入等待");

} catch (InterruptedException | BrokenBarrierException e) {

e.printStackTrace();

}

super.endQuery();

}

}

測試

代碼

public static void main(String[] args) {

CyclicBarrierQuery cyclicBarrierQuery = new CyclicBarrierQuery();

cyclicBarrierQuery.asyncToSync();

}

日志

[INFO] [2020-10-08 10:39:00.890] [main] [c.g.h.s.t.d.AbstractQuery.startQuery] - 開始查詢...

[INFO] [2020-10-08 10:39:00.892] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用開始

[INFO] [2020-10-08 10:39:05.894] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 遠(yuǎn)程調(diào)用結(jié)束

[INFO] [2020-10-08 10:39:05.895] [Thread-0] [c.g.h.s.t.d.CyclicBarrierQuery.remoteCall] - 遠(yuǎn)程調(diào)用進(jìn)入等待

[INFO] [2020-10-08 10:39:05.895] [main] [c.g.h.s.t.d.CyclicBarrierQuery.endQuery] - 主線程進(jìn)入等待

[INFO] [2020-10-08 10:39:05.896] [main] [c.g.h.s.t.d.AbstractQuery.endQuery] - 完成查詢,結(jié)果為:success

可以看出遠(yuǎn)程線程 Thread-0 執(zhí)行完之后就進(jìn)入等待,此時主線程調(diào)用,然后也進(jìn)入等待。

等主線程 endQuery 等待時,就滿足了兩個線程同時等待,然后執(zhí)行就結(jié)束了。

基于 Future 實現(xiàn)

Future 簡介

Future模式可以這樣來描述:我有一個任務(wù),提交給了Future,Future替我完成這個任務(wù)。期間我自己可以去做任何想做的事情。一段時間之后,我就便可以從Future那兒取出結(jié)果。就相當(dāng)于下了一張訂貨單,一段時間后可以拿著提訂單來提貨,這期間可以干別的任何事情。其中Future 接口就是訂貨單,真正處理訂單的是Executor類,它根據(jù)Future接口的要求來生產(chǎn)產(chǎn)品。

Future接口提供方法來檢測任務(wù)是否被執(zhí)行完,等待任務(wù)執(zhí)行完獲得結(jié)果,也可以設(shè)置任務(wù)執(zhí)行的超時時間。這個設(shè)置超時的方法就是實現(xiàn)Java程序執(zhí)行超時的關(guān)鍵。

詳細(xì)介紹:

java 代碼實現(xiàn)

采用 Future 返回和以前的實現(xiàn)差異較大,我們直接覆寫以前的方法即可。

import com.github.houbb.log.integration.core.Log;

import com.github.houbb.log.integration.core.LogFactory;

import java.util.concurrent.*;

/**

* Future 實現(xiàn)

* @author binbin.hou

* @since 1.0.0

*/

public class FutureQuery extends AbstractQuery {

private static final Log log = LogFactory.getLog(FutureQuery.class);

private final ExecutorService executorService = Executors.newSingleThreadExecutor();

@Override

public void asyncToSync() {

//1. 開始調(diào)用

super.startQuery();

//2. 遠(yuǎn)程調(diào)用

Future stringFuture = remoteCallFuture();

//3. 完成結(jié)果

try {

String result = stringFuture.get(10, TimeUnit.SECONDS);

log.info("調(diào)用結(jié)果:{}", result);

} catch (InterruptedException | TimeoutException | ExecutionException e) {

e.printStackTrace();

}

}

/**

* 遠(yuǎn)程調(diào)用

* @return Future 信息

*/

private Future remoteCallFuture() {

FutureTask futureTask = new FutureTask<>(new Callable() {

@Override

public String call() throws Exception {

log.info("開始異步調(diào)用");

TimeUnit.SECONDS.sleep(5);

log.info("完成異步調(diào)用");

return "success";

}

});

executorService.submit(futureTask);

// 關(guān)閉線程池

executorService.shutdown();

return futureTask;

}

public static void main(String[] args) {

FutureQuery query = new FutureQuery();

query.asyncToSync();

}

}

遠(yuǎn)程調(diào)用執(zhí)行時,是一個 FutureTask,然后提交到線程池去執(zhí)行。

獲取結(jié)果的時候,stringFuture.get(10, TimeUnit.SECONDS) 可以指定獲取的超時時間。

日志

測試日志如下:

[INFO] [2020-10-08 12:52:05.175] [main] [c.g.h.s.t.d.AbstractQuery.startQuery] - 開始查詢...

[INFO] [2020-10-08 12:52:05.177] [pool-1-thread-1] [c.g.h.s.t.d.FutureQuery.call] - 開始異步調(diào)用

[INFO] [2020-10-08 12:52:10.181] [pool-1-thread-1] [c.g.h.s.t.d.FutureQuery.call] - 完成異步調(diào)用

[INFO] [2020-10-08 12:52:10.185] [main] [c.g.h.s.t.d.FutureQuery.asyncToSync] - 調(diào)用結(jié)果:success

Spring EventListener

spring 事件監(jiān)聽器模式

對于一件事情完成的結(jié)果調(diào)用,使用觀察者模式是非常適合的。

spring 為我們提供了比較強(qiáng)大的監(jiān)聽機(jī)制,此處演示下結(jié)合 spring 使用的例子。

ps: 這個例子是2年前的自己寫的例子了,此處為了整個系列的完整性,直接搬過來作為補(bǔ)充。

代碼實現(xiàn)

BookingCreatedEvent.java

定義一個傳輸屬性的對象。

public class BookingCreatedEvent extends ApplicationEvent {

private static final long serialVersionUID = -1387078212317348344L;

private String info;

public BookingCreatedEvent(Object source) {

super(source);

}

public BookingCreatedEvent(Object source, String info) {

super(source);

this.info = info;

}

public String getInfo() {

return info;

}

}

BookingService.java

說明:當(dāng) this.context.publishEvent(bookingCreatedEvent); 觸發(fā)時,

會被 @EventListener 指定的方法監(jiān)聽到。

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.ApplicationContext;

import org.springframework.context.event.EventListener;

import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service

public class BookingService {

@Autowired

private ApplicationContext context;

private volatile BookingCreatedEvent bookingCreatedEvent;

/**

* 異步轉(zhuǎn)同步查詢

* @param info

* @return

*/

public String asyncQuery(final String info) {

query(info);

new Thread(new Runnable() {

@Override

public void run() {

remoteCallback(info);

}

}).start();

while(bookingCreatedEvent == null) {

//.. 空循環(huán)

// 短暫等待。

try {

TimeUnit.MILLISECONDS.sleep(1);

} catch (InterruptedException e) {

//...

}

//2. 使用兩個單獨的 event...

}

final String result = bookingCreatedEvent.getInfo();

bookingCreatedEvent = null;

return result;

}

@EventListener

public void onApplicationEvent(BookingCreatedEvent bookingCreatedEvent) {

System.out.println("監(jiān)聽到遠(yuǎn)程的信息: " + bookingCreatedEvent.getInfo());

this.bookingCreatedEvent = bookingCreatedEvent;

System.out.println("監(jiān)聽到遠(yuǎn)程消息后: " + this.bookingCreatedEvent.getInfo());

}

/**

* 執(zhí)行查詢

* @param info

*/

public void query(final String info) {

System.out.println("開始查詢: " + info);

}

/**

* 遠(yuǎn)程回調(diào)

* @param info

*/

public void remoteCallback(final String info) {

System.out.println("遠(yuǎn)程回調(diào)開始: " + info);

try {

TimeUnit.SECONDS.sleep(2);

} catch (InterruptedException e) {

e.printStackTrace();

}

// 重發(fā)結(jié)果事件

String result = info + "-result";

BookingCreatedEvent bookingCreatedEvent = new BookingCreatedEvent(this, result);

//觸發(fā)event

this.context.publishEvent(bookingCreatedEvent);

}

}

測試方法

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration(classes = SpringConfig.class)

public class BookServiceTest {

@Autowired

private BookingService bookingService;

@Test

public void asyncQueryTest() {

bookingService.asyncQuery("1234");

}

}

日志

2018-08-10 18:27:05.958 INFO [main] com.github.houbb.spring.lean.core.ioc.event.BookingService:84 - 開始查詢:1234

2018-08-10 18:27:05.959 INFO [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:93 - 遠(yuǎn)程回調(diào)開始:1234

接收到信息: 1234-result

2018-08-10 18:27:07.964 INFO [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:73 - 監(jiān)聽到遠(yuǎn)程的信息: 1234-result

2018-08-10 18:27:07.964 INFO [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:75 - 監(jiān)聽到遠(yuǎn)程消息后: 1234-result

2018-08-10 18:27:07.964 INFO [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:106 - 已經(jīng)觸發(fā)event

2018-08-10 18:27:07.964 INFO [main] com.github.houbb.spring.lean.core.ioc.event.BookingService:67 - 查詢結(jié)果: 1234-result

2018-08-10 18:27:07.968 INFO [Thread-1] org.springframework.context.support.GenericApplicationContext:993 - Closing org.springframework.context.support.GenericApplicationContext@5cee5251: startup date [Fri Aug 10 18:27:05 CST 2018]; root of context hierarchy

小結(jié)

本文共計介紹了 7 種異步轉(zhuǎn)同步的方式,實際上思想都是一樣的。

在異步執(zhí)行完成前等待,執(zhí)行完成后喚醒等待即可。

當(dāng)然我寫本文除了總結(jié)以上幾種方式以外,還想為后續(xù)寫一個異步轉(zhuǎn)同步的工具提供基礎(chǔ)。

下一節(jié)我們將一起學(xué)習(xí)下如何將這個功能封裝為一個同步轉(zhuǎn)換框架,感興趣的可以關(guān)注一下,便于實時接收最新內(nèi)容。

覺得本文對你有幫助的話,歡迎點贊評論收藏轉(zhuǎn)發(fā)一波。你的鼓勵,是我最大的動力~

不知道你有哪些收獲呢?或者有其他更多的想法,歡迎留言區(qū)和我一起討論,期待與你的思考相遇。

代碼地址

為了便于學(xué)習(xí),文中的所有例子都已經(jīng)開源:

實現(xiàn) 1-6:sync

總結(jié)

以上是生活随笔為你收集整理的mysql异步查询 java_java 手写并发框架(一)异步查询转同步的 7 种实现方式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。