Introduction
最近在看Domain Event的實作方式,剛好看到Teddy分享使用Guava的EventBus來處理事件的註冊與分派,於是花了一些時間實驗API的用法。以下內容分享給大家。Source code可以從link下載。
How to?
Event
我的範例事件為MJGotGodGuyCardEvent:
public class MJGotGodGuyCardEvent { private final String girlName; public MJGotGodGuyCardEvent(String girlName) { this.girlName = girlName; } public String getGirlName() { return girlName; } }
Event Handler
我的事件處理者有兩個,第一個為MJGoToPronhub。需注意的是:
- @Subscribe: 用以宣告處理函式,函式只允許一個參數,且要與Event物件相同。
- @AllowConcurrentEvents: 用以告知EventBus此函式可以接受Concurrent存取,預設會使用循序方式存取此函式。
public class MJGoToPronhub { private static Logger logger = LoggerFactory.getLogger(MJGoToPronhub.class); private List<MJGotGodGuyCardEvent> receivedEvents = new CopyOnWriteArrayList<>(); @AllowConcurrentEvents @Subscribe public void handle(MJGotGodGuyCardEvent event) { receivedEvents.add(event); logger.info("MJGoToPronhub due to {}", event.getGirlName()); } public List<MJGotGodGuyCardEvent> getReceivedEvents(){ return receivedEvents; } }
另外一個實作內容類似,不做贅述,名稱為MJGoToWanhua。
EventBus
用法相當簡單,只要透過EventBus的register把處理物件進去後,在負責發送通知的client使用post即可“循序”的讓handler處理訊息:
public class TestEventBus { private EventBus eventBus = new EventBus(); private MJGoToPronhub MJGoToPronhubHandler = new MJGoToPronhub(); private MJGoToWanhua MJGoToWanhuaHandler = new MJGoToWanhua(); private void thenTheHandlerShouldReceiveTheEvent() { assertEquals(1, MJGoToPronhubHandler.getReceivedEvents().size()); assertEquals(1, MJGoToWanhuaHandler.getReceivedEvents().size()); } private void givenEventBusRegisterTwoGoodHandler(EventBus eventBus) { eventBus.register(MJGoToPronhubHandler); eventBus.register(MJGoToWanhuaHandler); } @Test public void ShouldGetReceivedEventsWhenPostEventToHandlers() { givenEventBusRegisterTwoGoodHandler(eventBus); MJGotGodGuyCardEvent event = new MJGotGodGuyCardEvent("Nancy"); eventBus.post(event); thenTheHandlerShouldReceiveTheEvent(); } }
AsyncEventBus
假如覺得循序處理太慢,可以使用AsyncEventBus,使用方法與EventBus相同,但它post是non-blocking的:
public class TestAsyncEventBus { private AsyncEventBus asyncEventBus = new AsyncEventBus(Executors.newCachedThreadPool()); private MJGoToPronhub MJGoToPronhubHandler = new MJGoToPronhub(); private MJGoToWanhua MJGoToWanhuaHandler = new MJGoToWanhua(); private void thenTheHandlerShouldReceiveTheEvent() { assertEquals(1, MJGoToPronhubHandler.getReceivedEvents().size()); assertEquals(1, MJGoToWanhuaHandler.getReceivedEvents().size()); } private void givenEventBusRegisterTwoGoodHandler(EventBus eventBus) { eventBus.register(MJGoToPronhubHandler); eventBus.register(MJGoToWanhuaHandler); } private MJGotGodGuyCardEvent givenDelayedMJGotGodGuyCardEvent(CountDownLatch latch) { return new MJGotGodGuyCardEvent("Nancy") { @Override public String getGirlName() { try { TimeUnit.SECONDS.sleep(1); return super.getGirlName(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { latch.countDown(); } } }; } private void thenPostShouldBeNotBlocked(long startTime) { long afterTime = System.currentTimeMillis(); assertTrue((afterTime-startTime)<1000); } private void thenPostShouldBeDoneWithParallel(long startTime, CountDownLatch latch) throws InterruptedException { latch.await(); long afterTime = System.currentTimeMillis(); assertTrue((afterTime-startTime)>1000); } @Test public void testAsyncEventBus() throws InterruptedException { givenEventBusRegisterTwoGoodHandler(asyncEventBus); CountDownLatch latch = new CountDownLatch(2); MJGotGodGuyCardEvent event = givenDelayedMJGotGodGuyCardEvent(latch); long startTime = System.currentTimeMillis(); asyncEventBus.post(event); thenPostShouldBeNotBlocked(startTime); thenPostShouldBeDoneWithParallel(startTime, latch); thenTheHandlerShouldReceiveTheEvent(); } }
Notes
- 我使用的guava版本為30.0-jre。
- 預設情況下,Handler處理發生例外時,並不影響工作繼續進行。
- 可以透過實做SubscriberExceptionHandler去達到自己的例外處理需求,可由constructor去注入。
留言
張貼留言