1. Обзор

Проще говоря, MBassador — это высокопроизводительная шина событий, использующая семантику публикации-подписки.

Сообщения передаются одному или нескольким узлам без предварительного знания о количестве подписчиков или о том, как они используют сообщение.

2. Зависимость от Maven

Прежде чем мы сможем использовать библиотеку, нам нужно добавить зависимость от mbassador:

3. Базовая обработка событий

<dependency>
    <groupId>net.engio</groupId>
    <artifactId>mbassador</artifactId>
    <version>1.3.1</version>
</dependency>

3.1. Простой пример

Мы начнем с простого примера публикации сообщения:

В верхней части этого тестового класса мы видим создание MBassador с его конструктором по умолчанию. Затем в методе @Before мы вызываем subscribe() и передаем ссылку на сам класс.

private MBassador<Object> dispatcher = new MBassador<>();
private String messageString;

@Before
public void prepareTests() {
    dispatcher.subscribe(this);
}

@Test
public void whenStringDispatched_thenHandleString() {
    dispatcher.post("TestString").now();
 
    assertNotNull(messageString);
    assertEquals("TestString", messageString);
}

@Handler
public void handleString(String message) {
    messageString = message;
}

В методе subscribe() диспетчер проверяет подписчика на наличие аннотаций @Handler.

И в первом тесте мы вызываем dispatcher.post(…).now() для отправки сообщения, что приводит к вызову handleString().

Этот начальный тест демонстрирует несколько важных концепций. Любой объект может быть подписчиком, если у него есть один или несколько методов, аннотированных с помощью @Handler. Подписчик может иметь любое количество обработчиков.

Мы используем тестовые объекты, которые подписываются сами на себя для простоты, но в большинстве производственных сценариев диспетчеры сообщений относятся к классам, отличным от классов потребителей.

Методы-обработчики имеют только один входной параметр — сообщение и не могут генерировать проверенные исключения.

Подобно методу subscribe(), метод post принимает любой объект. Этот объект доставляется подписчикам.

Когда сообщение публикуется, оно доставляется всем слушателям, которые подписались на тип сообщения.

Давайте добавим еще один обработчик сообщений и отправим сообщение другого типа:

Как и ожидалось, когда мы отправляем Integer, вызывается handleInteger(), а handleString() — нет. Один диспетчер может использоваться для отправки более одного типа сообщений.

private Integer messageInteger; 

@Test
public void whenIntegerDispatched_thenHandleInteger() {
    dispatcher.post(42).now();
 
    assertNull(messageString);
    assertNotNull(messageInteger);
    assertTrue(42 == messageInteger);
}

@Handler
public void handleInteger(Integer message) {
    messageInteger = message;
}

3.2. Мертвые сообщения

Итак, куда девается сообщение, если для него нет обработчика? Давайте добавим новый обработчик событий, а затем отправим сообщение третьего типа:

В этом тесте мы отправляем Long вместо Integer. Ни handleInteger(), ни handleString() не вызываются, но вызывается handleDeadEvent().

private Object deadEvent; 

@Test
public void whenLongDispatched_thenDeadEvent() {
    dispatcher.post(42L).now();
 
    assertNull(messageString);
    assertNull(messageInteger);
    assertNotNull(deadEvent);
    assertTrue(deadEvent instanceof Long);
    assertTrue(42L == (Long) deadEvent);
} 

@Handler
public void handleDeadEvent(DeadMessage message) {
    deadEvent = message.getMessage();
}

Когда для сообщения нет обработчиков, оно помещается в объект DeadMessage. Так как мы добавили обработчик для Deadmessage, мы его захватываем.

DeadMessage можно смело игнорировать; если приложению не нужно отслеживать мертвые сообщения, их можно разрешить никуда не девать.

4. Использование иерархии событий

Отправка строковых и целочисленных событий имеет ограничения. Давайте создадим несколько классов сообщений:

У нас есть простой базовый класс и два класса, которые его расширяют.

public class Message {}

public class AckMessage extends Message {}

public class RejectMessage extends Message {
    int code;

    // setters and getters
}

4.1. Отправка сообщения базового класса

Начнем с событий Message:

Откройте для себя MBassador — высокопроизводительную шину событий pub-sub. Это ограничивает нас использованием сообщений, но добавляет дополнительный уровень безопасности типов.

private MBassador<Message> dispatcher = new MBassador<>();

private Message message;
private AckMessage ackMessage;
private RejectMessage rejectMessage;

@Before
public void prepareTests() {
    dispatcher.subscribe(this);
}

@Test
public void whenMessageDispatched_thenMessageHandled() {
    dispatcher.post(new Message()).now();
    assertNotNull(message);
    assertNull(ackMessage);
    assertNull(rejectMessage);
}

@Handler
public void handleMessage(Message message) {
    this.message = message;
}

@Handler
public void handleRejectMessage(RejectMessage message) {
   rejectMessage = message;
}

@Handler
public void handleAckMessage(AckMessage message) {
    ackMessage = message;
}

Когда мы отправляем сообщение, handleMessage() получает его. Два других обработчика этого не делают.

4.2. Отправка сообщения подкласса

Давайте отправим сообщение RejectMessage:

Когда мы отправляем сообщение RejectMessage, его получают и handleRejectMessage(), и handleMessage().

@Test
public void whenRejectDispatched_thenMessageAndRejectHandled() {
    dispatcher.post(new RejectMessage()).now();
 
    assertNotNull(message);
    assertNotNull(rejectMessage);
    assertNull(ackMessage);
}

Поскольку RejectMessage расширяет Message, обработчик Message получил его в дополнение к обработчику RejectMessage.

Давайте проверим это поведение с помощью AckMessage:

Как и ожидалось, когда мы отправляем AckMessage, его получают и handleAckMessage(), и handleMessage().

@Test
public void whenAckDispatched_thenMessageAndAckHandled() {
    dispatcher.post(new AckMessage()).now();
 
    assertNotNull(message);
    assertNotNull(ackMessage);
    assertNull(rejectMessage);
}

5. Фильтрация сообщений

Организация сообщений по типу уже является мощной функцией, но мы можем фильтровать их еще лучше.

5.1. Фильтр по классу и подклассу

Когда мы отправили RejectMessage или AckMessage, мы получили событие как в обработчике событий для определенного типа, так и в базовом классе.

Мы можем решить эту проблему иерархии типов, сделав Message абстрактным и создав класс, такой как GenericMessage. Но что, если у нас нет такой роскоши?

Мы можем использовать фильтры сообщений:

«

private Message baseMessage;
private Message subMessage;

@Test
public void whenMessageDispatched_thenMessageFiltered() {
    dispatcher.post(new Message()).now();
 
    assertNotNull(baseMessage);
    assertNull(subMessage);
}

@Test
public void whenRejectDispatched_thenRejectFiltered() {
    dispatcher.post(new RejectMessage()).now();
 
    assertNotNull(subMessage);
    assertNull(baseMessage);
}

@Handler(filters = { @Filter(Filters.RejectSubtypes.class) })
public void handleBaseMessage(Message message) {
    this.baseMessage = message;
}

@Handler(filters = { @Filter(Filters.SubtypesOnly.class) })
public void handleSubMessage(Message message) {
    this.subMessage = message;
}

«Параметр фильтров для аннотации @Handler принимает класс, который реализует IMessageFilter. Библиотека предлагает два примера:

Filters.RejectSubtypes делает то, что следует из названия: она отфильтровывает любые подтипы. В этом случае мы видим, что RejectMessage не обрабатывается handleBaseMessage().

Filters.SubtypesOnly также делает то, что следует из его названия: он отфильтровывает любые базовые типы. В этом случае мы видим, что Message не обрабатывается функцией handleSubMessage().

5.2. IMessageFilter

Filters.RejectSubtypes и Filters.SubtypesOnly реализуют IMessageFilter.

RejectSubTypes сравнивает класс сообщения с его определенными типами сообщений и разрешает только сообщения, соответствующие одному из его типов, в отличие от любых подклассов.

5.3. Фильтровать с условиями

К счастью, есть более простой способ фильтрации сообщений. MBassador поддерживает подмножество выражений Java EL в качестве условий для фильтрации сообщений.

Давайте отфильтруем строковое сообщение по его длине:

private String testString;

@Test
public void whenLongStringDispatched_thenStringFiltered() {
    dispatcher.post("foobar!").now();
 
    assertNull(testString);
}

@Handler(condition = "msg.length() < 7")
public void handleStringMessage(String message) {
    this.testString = message;
}

Сообщение «foobar!» состоит из семи символов и фильтруется. Давайте отправим более короткую строку:


@Test
public void whenShortStringDispatched_thenStringHandled() {
    dispatcher.post("foobar").now();
 
    assertNotNull(testString);
}

Теперь «foobar» имеет длину всего шесть символов и передается.

Наше RejectMessage содержит поле с аксессором. Давайте напишем для этого фильтр:

private RejectMessage rejectMessage;

@Test
public void whenWrongRejectDispatched_thenRejectFiltered() {

    RejectMessage testReject = new RejectMessage();
    testReject.setCode(-1);

    dispatcher.post(testReject).now();
 
    assertNull(rejectMessage);
    assertNotNull(subMessage);
    assertEquals(-1, ((RejectMessage) subMessage).getCode());
}

@Handler(condition = "msg.getCode() != -1")
public void handleRejectMessage(RejectMessage rejectMessage) {
    this.rejectMessage = rejectMessage;
}

Здесь снова мы можем запросить метод объекта и либо отфильтровать сообщение, либо нет.

5.4. Захват отфильтрованных сообщений

Подобно DeadEvents, мы можем захотеть захватить и обработать отфильтрованные сообщения. Также существует специальный механизм для захвата отфильтрованных событий. Отфильтрованные события обрабатываются иначе, чем «мертвые» события.

Давайте напишем тест, иллюстрирующий это:

private String testString;
private FilteredMessage filteredMessage;
private DeadMessage deadMessage;

@Test
public void whenLongStringDispatched_thenStringFiltered() {
    dispatcher.post("foobar!").now();
 
    assertNull(testString);
    assertNotNull(filteredMessage);
    assertTrue(filteredMessage.getMessage() instanceof String);
    assertNull(deadMessage);
}

@Handler(condition = "msg.length() < 7")
public void handleStringMessage(String message) {
    this.testString = message;
}

@Handler
public void handleFilterMessage(FilteredMessage message) {
    this.filteredMessage = message;
}

@Handler
public void handleDeadMessage(DeadMessage deadMessage) {
    this.deadMessage = deadMessage;
}

С добавлением обработчика FilteredMessage мы можем отслеживать строки, отфильтрованные из-за их длины. filterMessage содержит нашу слишком длинную строку, в то время как deadMessage остается нулевым.

6. Асинхронная отправка сообщений и обработка

До сих пор во всех наших примерах использовалась синхронная отправка сообщений; когда мы вызывали post.now(), сообщения доставлялись каждому обработчику в том же потоке, из которого мы вызывали post().

6.1. Асинхронная отправка

MBassador.post() возвращает SyncAsyncPostCommand. Этот класс предлагает несколько методов, в том числе:

    now() — синхронная отправка сообщений; вызов будет заблокирован до тех пор, пока все сообщения не будут доставлены. Мы будем использовать Awaitility в этих тестах, чтобы упростить код:

Мы вызываем asynchronously() в этом тесте и используем AtomicBoolean в качестве флага с await() для ожидания доставки сообщения потоком доставки.

private MBassador<Message> dispatcher = new MBassador<>();
private String testString;
private AtomicBoolean ready = new AtomicBoolean(false);

@Test
public void whenAsyncDispatched_thenMessageReceived() {
    dispatcher.post("foobar").asynchronously();
 
    await().untilAtomic(ready, equalTo(true));
    assertNotNull(testString);
}

@Handler
public void handleStringMessage(String message) {
    this.testString = message;
    ready.set(true);
}

Если мы закомментируем вызов await(), мы рискуем провалить тест, потому что мы проверяем testString до завершения потока доставки.

6.2. Вызов асинхронного обработчика

Асинхронная отправка позволяет провайдеру сообщений вернуться к обработке сообщений до того, как сообщения будут доставлены каждому обработчику, но он по-прежнему вызывает каждый обработчик по порядку, и каждый обработчик должен ждать завершения предыдущего.

Это может привести к проблемам, если один обработчик выполняет дорогостоящую операцию.

MBassador предоставляет механизм асинхронного вызова обработчика. Обработчики, настроенные для этого, получают сообщения в своем потоке:

Обработчики могут запрашивать асинхронный вызов с помощью свойства delivery = Invoke.Asynchronously в аннотации Handler. Мы проверяем это в нашем тесте, сравнивая имена потоков в методе диспетчеризации и обработчике.

private Integer testInteger;
private String invocationThreadName;
private AtomicBoolean ready = new AtomicBoolean(false);

@Test
public void whenHandlerAsync_thenHandled() {
    dispatcher.post(42).now();
 
    await().untilAtomic(ready, equalTo(true));
    assertNotNull(testInteger);
    assertFalse(Thread.currentThread().getName().equals(invocationThreadName));
}

@Handler(delivery = Invoke.Asynchronously)
public void handleIntegerMessage(Integer message) {
 
    this.invocationThreadName = Thread.currentThread().getName();
    this.testInteger = message;
    ready.set(true);
}

7. Настройка MBassador

До сих пор мы использовали экземпляр MBassador с конфигурацией по умолчанию. Поведение диспетчера можно изменить с помощью аннотаций, аналогичных тем, которые мы видели до сих пор; мы рассмотрим еще несколько, чтобы закончить этот урок.

7.1. Обработка исключений

Обработчики не могут определять проверенные исключения. Вместо этого диспетчеру можно предоставить IPublicationErrorHandler в качестве аргумента его конструктору:

«

public class MBassadorConfigurationTest
  implements IPublicationErrorHandler {

    private MBassador dispatcher;
    private String messageString;
    private Throwable errorCause;

    @Before
    public void prepareTests() {
        dispatcher = new MBassador<String>(this);
        dispatcher.subscribe(this);
    }

    @Test
    public void whenErrorOccurs_thenErrorHandler() {
        dispatcher.post("Error").now();
 
        assertNull(messageString);
        assertNotNull(errorCause);
    }

    @Test
    public void whenNoErrorOccurs_thenStringHandler() {
        dispatcher.post("Error").now();
 
        assertNull(errorCause);
        assertNotNull(messageString);
    }

    @Handler
    public void handleString(String message) {
        if ("Error".equals(message)) {
            throw new Error("BOOM");
        }
        messageString = message;
    }

    @Override
    public void handleError(PublicationError error) {
        errorCause = error.getCause().getCause();
    }
}

«Когда handleString() выдает ошибку, она сохраняется в errorCause.

7.2. Приоритет обработчиков

Обработчики вызываются в порядке, обратном их добавлению, но это не то поведение, на которое мы хотим полагаться. Даже имея возможность вызывать обработчики в своих потоках, нам все равно может понадобиться знать, в каком порядке они будут вызываться.

Мы можем явно установить приоритет обработчика:

private LinkedList<Integer> list = new LinkedList<>();

@Test
public void whenRejectDispatched_thenPriorityHandled() {
    dispatcher.post(new RejectMessage()).now();

    // Items should pop() off in reverse priority order
    assertTrue(1 == list.pop());
    assertTrue(3 == list.pop());
    assertTrue(5 == list.pop());
}

@Handler(priority = 5)
public void handleRejectMessage5(RejectMessage rejectMessage) {
    list.push(5);
}

@Handler(priority = 3)
public void handleRejectMessage3(RejectMessage rejectMessage) {
    list.push(3);
}

@Handler(priority = 2, rejectSubtypes = true)
public void handleMessage(Message rejectMessage) 
    logger.error("Reject handler #3");
    list.push(3);
}

@Handler(priority = 0)
public void handleRejectMessage0(RejectMessage rejectMessage) {
    list.push(1);
}

Обработчики вызываются от наивысшего приоритета к низшему. Обработчики с приоритетом по умолчанию, равным нулю, вызываются последними. Мы видим, что номера обработчиков pop() отключаются в обратном порядке.

7.3. Отказ от подтипов, простой способ

Что случилось с handleMessage() в приведенном выше тесте? Нам не нужно использовать RejectSubTypes.class для фильтрации наших подтипов.

RejectSubTypes — это логический флаг, который обеспечивает ту же фильтрацию, что и класс, но с более высокой производительностью, чем реализация IMessageFilter.

Однако нам по-прежнему нужно использовать реализацию на основе фильтров только для приема подтипов.

8. Заключение

MBassador — простая и понятная библиотека для передачи сообщений между объектами. Сообщения могут быть организованы различными способами и могут быть отправлены синхронно или асинхронно.

И, как всегда, пример доступен в этом проекте GitHub.