日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java Review - Java进程内部的消息中间件_Event Bus设计模式

發布時間:2025/3/21 java 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java Review - Java进程内部的消息中间件_Event Bus设计模式 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • 概述
  • EventBus架構類圖
  • Code
    • Bus接口 (定義注冊topic以及發送event接口)
    • 自定義注解-回調方法及topic
    • 同步EventBus
    • 異步EventBus
    • Subscriber注冊表Registry (維護topic和subscriber之間的關系)
    • Event廣播Dispatcher
    • 其他類接口設計
    • 測試
      • 同步&異步 Event Bus


概述

在工作中,我們都會使用到MQ 比如 Apache Kafka等,某subscriber在消息中間件上注冊了某個topic(主題),當有消息發送到了該topic上之后,注冊在該topic上的所有subscriber都將會收到消息 。

消息中間件提供了系統之間的異步處理機制。 主業務完成后即可向用戶返回成功的通知,然后提交各種消息至消息中間件,這樣注冊在消息中間件的其他系統就可以順利地接收通知了,然后執行各自的業務邏輯。消息中間件主要用于解決進程之間消息異步處理的解決方案,這里,我們使用消息中間件的思想設計一個Java進程內部的消息中間件——Event Bus


EventBus架構類圖

  • Bus接口對外提供了幾種主要的使用方式,比如post方法用來發送Event,register方法用來注冊Event接收者(Subscriber)接受響應事件

  • EventBus采用同步的方式推送Event,AsyncEventBus采用異步的方式(Thread-Per-Message)推送Event。

  • Registry注冊表,主要用來記錄對應的Subscriber以及受理消息的回調方法,回調方法我們用注解@Subscribe來標識。

  • Dispatcher主要用來將event廣播給注冊表中監聽了topic的Subscriber


Code

Bus接口 (定義注冊topic以及發送event接口)

/*** Bus接口定義了EventBus的所有使用方法** @author artisan*/ public interface Bus {/*** 將某個對象注冊到Bus上,從此之后該類就成為Subscriber了*/void register(Object subscriber);/*** 將某個對象從Bus上取消注冊,取消注冊之后就不會再接收到來自Bus的任何消息*/void unregister(Object subscriber);/*** 提交Event到默認的topic*/void post(Object event);/*** 提交Event到指定的topic*/void post(Object event, String topic);/*** 關閉該bus*/void close();/*** 返回Bus的名稱標識*/String getBusName();}

Bus接口中定義了注冊topic的方法和Event發送的方法

  • register(Object subscriber):將某個對象實例注冊給Event Bus。

  • unregister(Object subscriber):取消對該對象實例的注冊,會在Event Bus的注冊表(Registry)中將其移除。

  • post(Object event):提交Event到Event Bus中,如果未指定topic則會將event廣播給Event Bus默認的topic。

  • post(Object event, String topic) :提交Event的同時指定了topic。

  • close():銷毀該Event Bus。

  • getBusName():返回該Event Bus的名稱


自定義注解-回調方法及topic

注冊對象給Event Bus的時候需要指定接收消息時的回調方法,我們采用注解的方式進行Event回調

import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target;/*** @author artisan*/ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface Subscribe {String topic() default "default-topic"; }

@Subscribe要求注解在類中的方法,注解時可指定topic,不指定的情況下為默認的topic 【default-topic】


同步EventBus

同步EventBus是最核心的一個類,它實現了Bus的所有功能,但是該類對Event的廣播推送采用的是同步的方式

/*** @author 小工匠* @version 1.0* @description: Bus實現類* @date 2021/12/1 23:00* @mark: show me the code , change the world*/ public class EventBus implements Bus {/*** 用于維護Subscriber的注冊表*/private final Registry registry = new Registry();/*** Event Bus的名字*/private String busName;/*** 默認的Event Bus的名字*/private final static String DEFAULT_BUS_NAME = "default";/*** 默認的topic的名字*/private final static String DEFAULT_TOPIC = "default-topic";/*** 用于分發廣播消息到各個Subscriber的類*/private final Dispatcher dispatcher;/*** 構造函數*/public EventBus() {this(DEFAULT_BUS_NAME, null, Dispatcher.SEQ_EXECUTOR_SERVICE);}public EventBus(String busName) {this(busName, null, Dispatcher.SEQ_EXECUTOR_SERVICE);}EventBus(String busName, EventExceptionHandler exceptionHandler, Executor executor) {this.busName = busName;this.dispatcher = Dispatcher.newDispatcher(exceptionHandler, executor);}public EventBus(EventExceptionHandler exceptionHandler) {this(DEFAULT_BUS_NAME, exceptionHandler, Dispatcher.SEQ_EXECUTOR_SERVICE);}/*** 將注冊Subscriber的動作直接委托給Registry** @param subscriber*/@Overridepublic void register(Object subscriber) {this.registry.bind(subscriber);}/*** 接觸注冊同樣委托給Registry** @param subscriber*/@Overridepublic void unregister(Object subscriber) {this.registry.unbind(subscriber);}/*** 提交Event到默認的topic** @param event*/@Overridepublic void post(Object event) {this.post(event, DEFAULT_TOPIC);}/*** 提交Event到指定的topic,具體的動作是由Dispatcher來完成的** @param event* @param topic*/@Overridepublic void post(Object event, String topic) {this.dispatcher.dispatch(this, registry, event, topic);}/*** 關閉銷毀Bus*/@Overridepublic void close() {this.dispatcher.close();}/*** 返回Bus的名稱** @return*/@Overridepublic String getBusName() {return this.busName;} }

有幾個點需要注意一下

  • EventBus的構造除了名稱之外,還需要有ExceptionHandler和Executor,后兩個主要是給Dispatcher使用的。

  • registry和unregister都是通過Subscriber注冊表來完成的。

  • Event的提交則是由Dispatcher來完成的

  • Executor使用JDK中的Executor接口,如果我們自己開發的ThreadPool天生就是多線程并發執行任務的線程池,自帶異步處理能力,但是無法做到同步任務處理,因此我們使用Executor可以任意擴展同步、異步的任務處理方式。


異步EventBus

異步的EventBus比較簡單,繼承自同步Bus,然后將Thread-Per-Message用異步處理任務的Executor替換EventBus中的同步Executor即可

import java.util.concurrent.ThreadPoolExecutor;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/2 10:59* @mark: show me the code , change the world*/ public class AsyncEventBus extends EventBus {AsyncEventBus(String busName, EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) {super(busName, exceptionHandler, executor);}public AsyncEventBus(String busName, ThreadPoolExecutor executor) {this(busName, null, executor);}public AsyncEventBus(ThreadPoolExecutor executor) {this("default-async", null, executor);}public AsyncEventBus(EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) {this("default-async", exceptionHandler, executor);} }

可以看到AsyncEventBus 重寫了父類EventBus的構造函數,使用ThreadPoolExecutor替代Executor。


Subscriber注冊表Registry (維護topic和subscriber之間的關系)

注冊表Registry維護了topic和subscriber之間的關系。

當有Event被post之后,Dispatcher需要知道該消息應該發送給哪個Subscriber的實例和對應的方法,Subscriber對象沒有任何特殊要求,就是普通的類不需要繼承任何父類或者實現任何接口

import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/1 23:42* @mark: show me the code , change the world*/class Registry {/*** 存儲Subscriber集合和topic之間關系的map*/private final ConcurrentHashMap<String, ConcurrentLinkedQueue<Subscriber>> subscriberContainer = new ConcurrentHashMap<>();/*** 獲取Subscriber Object的方法集合然后進行綁定** @param subscriber*/public void bind(Object subscriber) {List<Method> subscribeMethods = getSubscribeMethods(subscriber);subscribeMethods.forEach(m -> tierSubscriber(subscriber, m));}public void unbind(Object subscriber) {//unbind為了提高速度,只對Subscriber進行失效操作subscriberContainer.forEach((key, queue) ->queue.forEach(s ->{if (s.getSubscribeObject() == subscriber) {s.setDisable(true);}}));}public ConcurrentLinkedQueue<Subscriber> scanSubscriber(final String topic) {return subscriberContainer.get(topic);}private void tierSubscriber(Object subscriber, Method method) {final Subscribe subscribe = method.getDeclaredAnnotation(Subscribe.class);String topic = subscribe.topic();//當某topic沒有Subscriber Queue的時候創建一個subscriberContainer.computeIfAbsent(topic, key -> new ConcurrentLinkedQueue<>());//創建一個Subscriber并且加入Subscriber列表中subscriberContainer.get(topic).add(new Subscriber(subscriber, method));}private List<Method> getSubscribeMethods(Object subscriber) {final List<Method> methods = new ArrayList<>();Class<?> temp = subscriber.getClass();//不斷獲取當前類和父類的所有@Subscribe方法while (temp != null) {//獲取所有的方法Method[] declaredMethods = temp.getDeclaredMethods();//只有public方法 &&有一個入參 &&最重要的是被@Subscribe標記的方法才符合回調方法Arrays.stream(declaredMethods).filter(m -> m.isAnnotationPresent(Subscribe.class)&& m.getParameterCount() == 1&& m.getModifiers() == Modifier.PUBLIC).forEach(methods::add);temp = temp.getSuperclass();}return methods;}}

由于Registry是在Bus中使用的,不能暴露給外部,因此Registry被設計成了包可見的類。

我們所設計的EventBus對Subscriber沒有做任何限制,但是要接受event的回調則需要將方法使用注解@Subscribe進行標記(可指定topic)

同一個Subscriber的不同方法通過@Subscribe注解之后可接受來自兩個不同的topic消息

public class SimpleObject {/*** subscribe方法,比如使用@Subscribe標記,并且是void類型且有一個參數*/@Subscribe(topic = "artisan-topic")public void test2(Integer x) {}@Subscribe(topic = "test-topic")public void test3(Integer x) {} }

SimpleObject的實例被注冊到了Event Bus之后,test2和test3這兩個方法將會被加入到注冊表中,分別用來接受來自artisan-topic和test-topic的event 。


Event廣播Dispatcher

Dispatcher的主要作用是將EventBus post的event推送給每一個注冊到topic上的subscriber上,具體的推送其實就是執行被@Subscribe注解的方法。

import java.lang.reflect.Method; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/2 00:36* @mark: show me the code , change the world*/ public class Dispatcher {private final Executor executorService;private final EventExceptionHandler exceptionHandler;public static final Executor SEQ_EXECUTOR_SERVICE = SeqExecutorService.INSTANCE;public static final Executor PRE_THREAD_EXECUTOR_SERVICE = PreThreadExecutorService.INSTANCE;private Dispatcher(Executor executorService, EventExceptionHandler exceptionHandler) {this.executorService = executorService;this.exceptionHandler = exceptionHandler;}public void dispatch(Bus bus, Registry registry, Object event, String topic) {//根據topic獲取所有的Subscriber列表ConcurrentLinkedQueue<Subscriber> subscribers = registry.scanSubscriber(topic);if (null == subscribers) {if (exceptionHandler != null) {exceptionHandler.handle(new IllegalArgumentException("The topic " + topic + " not bind yet"),new BaseEventContext(bus.getBusName(), null, event));}return;}//遍歷所有的方法,并且通過反射的方式進行方法調用subscribers.stream().filter(subscriber -> !subscriber.isDisable()).filter(subscriber ->{Method subscribeMethod = subscriber.getSubscribeMethod();Class<?> aClass = subscribeMethod.getParameterTypes()[0];return (aClass.isAssignableFrom(event.getClass()));}).forEach(subscriber -> realInvokeSubscribe(subscriber, event, bus));}private void realInvokeSubscribe(Subscriber subscriber, Object event, Bus bus) {Method subscribeMethod = subscriber.getSubscribeMethod();Object subscribeObject = subscriber.getSubscribeObject();executorService.execute(() -> {try {subscribeMethod.invoke(subscribeObject, event);} catch (Exception e) {if (null != exceptionHandler) {exceptionHandler.handle(e, new BaseEventContext(bus.getBusName(), subscriber, event));}}});}public void close() {if (executorService instanceof ExecutorService) {((ExecutorService) executorService).shutdown();}}static Dispatcher newDispatcher(EventExceptionHandler exceptionHandler, Executor executor) {return new Dispatcher(executor, exceptionHandler);}static Dispatcher seqDispatcher(EventExceptionHandler exceptionHandler) {return new Dispatcher(SEQ_EXECUTOR_SERVICE, exceptionHandler);}static Dispatcher perThreadDispatcher(EventExceptionHandler exceptionHandler) {return new Dispatcher(PRE_THREAD_EXECUTOR_SERVICE, exceptionHandler);}/*** 順序執行的ExecutorService*/private static class SeqExecutorService implements Executor {private final static SeqExecutorService INSTANCE = new SeqExecutorService();@Overridepublic void execute(Runnable command) {command.run();}}/*** 每個線程負責一次消息推送*/private static class PreThreadExecutorService implements Executor {private final static PreThreadExecutorService INSTANCE = new PreThreadExecutorService();@Overridepublic void execute(Runnable command) {new Thread(command).start();}}/*** 默認的EventContext實現*/private static class BaseEventContext implements EventContext {private final String eventBusName;private final Subscriber subscriber;private final Object event;private BaseEventContext(String eventBusName, Subscriber subscriber, Object event) {this.eventBusName = eventBusName;this.subscriber = subscriber;this.event = event;}@Overridepublic String getSource() {return this.eventBusName;}@Overridepublic Object getSubscriber() {return subscriber != null ? subscriber.getSubscribeObject() : null;}@Overridepublic Method getSubscribe() {return subscriber != null ? subscriber.getSubscribeMethod() : null;}@Overridepublic Object getEvent() {return this.event;}} }

在Dispatcher中,除了從Registry中獲取對應的Subscriber執行之外,我們還定義了幾個靜態內部類,其主要是實現了Executor接口和EventContent。

其他類接口設計

除了上面一些比較核心的類之外,還需要Subscriber封裝類以及EventContext、Event-ExceptionHandler接口

【Subscriber類】

import java.lang.reflect.Method;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/1 23:33* @mark: show me the code , change the world*/ public class Subscriber {private final Object subscribeObject;private final Method subscribeMethod;private boolean disable = false;public Subscriber(Object subscribeObject, Method subscribeMethod) {this.subscribeObject = subscribeObject;this.subscribeMethod = subscribeMethod;}public Object getSubscribeObject() {return subscribeObject;}public Method getSubscribeMethod() {return subscribeMethod;}public boolean isDisable() {return disable;}public void setDisable(boolean disable) {this.disable = disable;} }

Subscriber類封裝了對象實例和被@Subscribe標記的方法,也就是說一個對象實例有可能會被封裝成若干個Subscriber


【EventExceptionHandler接口】

EventBus會將方法的調用交給Runnable接口去執行,我們都知道Runnable接口不能拋出checked異常信息,并且在每一個subscribe方法中,也不允許將異常拋出從而影響EventBus對后續Subscriber進行消息推送,但是異常信息又不能被忽略掉,因此注冊一個異常回調接口就可以知道在進行消息廣播推送時都發生了什么

public interface EventExceptionHandler {void handle(Throwable cause, EventContext context); }

【EventContext接口】

EventContext接口提供了獲取消息源、消息體,以及該消息是由哪一個Subscriber的哪個subscribe方法所接受,主要用于消息推送出錯時被回調接口EventExceptionHandler使用

import java.lang.reflect.Method;public interface EventContext {String getSource();Object getSubscriber();Method getSubscribe();Object getEvent();}

測試

我們簡單地定義兩個普通對象SimpleSubscriber1和SimpleSubscriber2

/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/2 11:06* @mark: show me the code , change the world*/ public class SimpleSubscriber1 {@Subscribepublic void method1(String message) {System.out.println(String.format("線程 %s , SimpleSubscriber2#method1 called --- %s ",Thread.currentThread().getName() ,message));}@Subscribe(topic = "test")public void method2(String message) {System.out.println(String.format("線程:%s: Test Topic | SimpleSubscriber2#method2 called --- %s", Thread.currentThread().getName(), message));}} /*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/2 11:27* @mark: show me the code , change the world*/ public class SimpleSubscriber2 {@Subscribepublic void method1(String message) {System.out.println(String.format("線程 %s , SimpleSubscriber2#method1 called --- %s ",Thread.currentThread().getName() ,message));}@Subscribe(topic = "test")public void method2(String message) {System.out.println(String.format("線程:%s: Test Topic | SimpleSubscriber2#method2 called --- %s", Thread.currentThread().getName(), message));} }

模擬復雜消息

/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/2 11:28* @mark: show me the code , change the world*/ public class SimpleSubscriber3 {@Subscribepublic void method1(Object message) {if (message instanceof WildMessage){System.out.println(String.format("線程 %s , SimpleSubscriber3#method1 called --- %s ",Thread.currentThread().getName() ,((WildMessage)message).getData()));}}} /*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/2 13:16* @mark: show me the code , change the world*/ public class WildMessage {private String data;public WildMessage(String data) {this.data = data;}public String getData() {return data;}public void setData(String data) {this.data = data;} }

同步&異步 Event Bus

import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/2 13:32* @mark: show me the code , change the world*/ public class Test {public static void main(String[] args) {Bus bus = new EventBus("TestBus");// 注冊bus.register(new SimpleSubscriber1());bus.register(new SimpleSubscriber2());// 發布消息bus.post("Hello");bus.post("Hello", "test");bus.register(new SimpleSubscriber3());bus.post(new WildMessage("SourceMessage"));System.out.println("\n\n\n\n");System.out.println("-------異步-----");Bus asyncEventBus = new AsyncEventBus("TestBus", (ThreadPoolExecutor) Executors.newFixedThreadPool(10));asyncEventBus.register(new SimpleSubscriber1());asyncEventBus.register(new SimpleSubscriber2());asyncEventBus.post("Hello");asyncEventBus.post("Hello", "test");}}

解析下結果哈: 同步的EventBus,將三個普通的對象注冊給了bus,當bus發送Event的時候topic相同,Event類型相同的subscribe方法將被執行。

同步的Event Bus有個缺點,若其中的一個subscribe方法運行時間比較長,則會影響下一個subscribe方法的執行,因此采用AsyncEventBus是另外一個比較好的選擇。

總結

以上是生活随笔為你收集整理的Java Review - Java进程内部的消息中间件_Event Bus设计模式的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。