«1. Обзор

JGroups — это Java API для надежного обмена сообщениями. Он имеет простой интерфейс, который обеспечивает:

    гибкий стек протоколов, включая фрагментацию TCP и UDP и повторную сборку больших сообщений надежное управление потоком обнаружения отказов одноадресной и многоадресной рассылки

А также многие другие функции.

В этом руководстве мы создадим простое приложение для обмена строковыми сообщениями между приложениями и предоставления общего состояния новым приложениям по мере их подключения к сети.

2. Настройка

2.1. Зависимость Maven

Нам нужно добавить одну зависимость в наш pom.xml:

<dependency>
    <groupId>org.jgroups</groupId>
    <artifactId>jgroups</artifactId>
    <version>4.0.10.Final</version>
</dependency>

Последнюю версию библиотеки можно проверить на Maven Central.

2.2. Сеть

JGroups попытается использовать IPV6 по умолчанию. В зависимости от конфигурации нашей системы это может привести к тому, что приложения не смогут обмениваться данными.

Чтобы избежать этого, мы установим для свойства java.net.preferIPv4Stack значение true при запуске наших приложений здесь:

java -Djava.net.preferIPv4Stack=true com.baeldung.jgroups.JGroupsMessenger

3. JChannels

Наше соединение с сетью JGroups — это JChannel. Канал присоединяется к кластеру и отправляет и получает сообщения, а также информацию о состоянии сети.

3.1. Создание канала

Мы создаем JChannel с путем к файлу конфигурации. Если мы опустим имя файла, он будет искать udp.xml в текущем рабочем каталоге.

Мы создадим канал с конфигурационным файлом с явным именем:

JChannel channel = new JChannel("src/main/resources/udp.xml");

Конфигурация JGroups может быть очень сложной, но стандартных конфигураций UDP и TCP достаточно для большинства приложений. Мы включили файл для UDP в наш код и будем использовать его в этом руководстве.

Для получения дополнительной информации о настройке транспорта см. руководство JGroups здесь.

3.2. Подключение канала

После того, как мы создали наш канал, нам нужно присоединиться к кластеру. Кластер — это группа узлов, которые обмениваются сообщениями.

Для присоединения к кластеру требуется имя кластера:

channel.connect("Baeldung");

Первый узел, который попытается присоединиться к кластеру, создаст его, если он не существует. Ниже мы увидим этот процесс в действии.

3.3. Именование канала

Узлы идентифицируются по имени, поэтому одноранговые узлы могут отправлять направленные сообщения и получать уведомления о том, кто входит и выходит из кластера. JGroups назначит имя автоматически, или мы можем установить свое собственное:

channel.name("user1");

Мы будем использовать эти имена ниже, чтобы отслеживать, когда узлы входят в кластер и покидают его.

3.4. Закрытие канала

Очистка канала необходима, если мы хотим, чтобы одноранговые узлы получали своевременное уведомление о выходе.

Мы закрываем JChannel с помощью его метода close:

channel.close()

4. Изменения в представлении кластера

После создания JChannel мы готовы видеть состояние одноранговых узлов в кластере и обмениваться с ними сообщениями.

JGroups поддерживает состояние кластера внутри класса View. Каждый канал имеет один вид сети. Когда представление изменяется, оно доставляется с помощью обратного вызова viewAccepted().

В этом руководстве мы расширим API-класс ReceiverAdaptor, который реализует все методы интерфейса, необходимые для приложения.

Это рекомендуемый способ реализации обратных вызовов.

Давайте добавим viewAccepted в наше приложение:

public void viewAccepted(View newView) {

    private View lastView;

    if (lastView == null) {
        System.out.println("Received initial view:");
        newView.forEach(System.out::println);
    } else {
        System.out.println("Received new view.");

        List<Address> newMembers = View.newMembers(lastView, newView);
        System.out.println("New members: ");
        newMembers.forEach(System.out::println);

        List<Address> exMembers = View.leftMembers(lastView, newView);
        System.out.println("Exited members:");
        exMembers.forEach(System.out::println);
    }
    lastView = newView;
}

Каждое представление содержит список объектов Address, представляющих каждого члена кластера. JGroups предлагает удобные методы для сравнения одного представления с другим, которые мы используем для обнаружения новых или закрытых членов кластера.

5. Отправка сообщений

Обработка сообщений в JGroups проста. Сообщение содержит массив байтов и объекты Address, соответствующие отправителю и получателю.

В этом руководстве мы используем строки, считываемые из командной строки, но легко увидеть, как приложение может обмениваться данными других типов.

5.1. Широковещательные сообщения

Сообщение создается с назначением и массивом байтов; JChannel устанавливает для нас отправителя. Если цель равна null, весь кластер получит сообщение.

Примем текст из командной строки и отправим его в кластер:

System.out.print("Enter a message: ");
String line = in.readLine().toLowerCase();
Message message = new Message(null, line.getBytes());
channel.send(message);

«

«Если мы запустим несколько экземпляров нашей программы и отправим это сообщение (после того, как мы реализуем метод receive() ниже), все они получат его, включая отправителя.

5.2. Блокировка наших сообщений

channel.setDiscardOwnMessages(true);

Если мы не хотим видеть наши сообщения, мы можем установить для этого свойство:

Когда мы запускаем предыдущий тест, отправитель сообщения не получает свое широковещательное сообщение.

5.3. Прямые сообщения

Для отправки прямого сообщения требуется действительный адрес. Если мы обращаемся к узлам по имени, нам нужен способ поиска адреса. К счастью, у нас есть View для этого.

private Optional<address> getAddress(String name) { 
    View view = channel.view(); 
    return view.getMembers().stream()
      .filter(address -> name.equals(address.toString()))
      .findAny(); 
}

Текущее представление всегда доступно из JChannel:

Имена адресов доступны через метод класса toString(), поэтому мы просто ищем нужное имя в списке членов кластера.

Address destination = null;
System.out.print("Enter a destination: ");
String destinationName = in.readLine().toLowerCase();
destination = getAddress(destinationName)
  .orElseThrow(() -> new Exception("Destination not found"); 
Message message = new Message(destination, "Hi there!"); 
channel.send(message);

Итак, мы можем принять имя из консоли, найти связанный пункт назначения и отправить прямое сообщение:

6. Получение сообщений

Мы можем отправлять сообщения, теперь давайте добавим попытку их получения в настоящее время.

public void receive(Message message) {
    String line = Message received from: " 
      + message.getSrc() 
      + " to: " + message.getDest() 
      + " -> " + message.getObject();
    System.out.println(line);
}

Давайте переопределим пустой метод получения ReceiverAdaptor:

Поскольку мы знаем, что сообщение содержит строку, мы можем безопасно передать getObject() в System.out.

7. Обмен состояниями

Когда узел входит в сеть, ему может потребоваться получить информацию о состоянии кластера. JGroups предоставляет для этого механизм передачи состояния.

Когда узел присоединяется к кластеру, он просто вызывает getState(). Кластер обычно получает состояние от самого старшего члена группы — координатора.

private Integer messageCount = 0;

public void receive(Message message) {
    String line = "Message received from: " 
      + message.getSrc() 
      + " to: " + message.getDest() 
      + " -> " + message.getObject();
    System.out.println(line);

    if (message.getDest() == null) {
        messageCount++;
        System.out.println("Message count: " + messageCount);
    }
}

Давайте добавим счетчик широковещательных сообщений в наше приложение. Мы добавим новую переменную-член и увеличим ее внутри Receive():

Мы проверяем нулевой пункт назначения, потому что, если мы подсчитываем прямые сообщения, каждый узел будет иметь разный номер.

public void setState(InputStream input) {
    try {
        messageCount = Util.objectFromStream(new DataInputStream(input));
    } catch (Exception e) {
        System.out.println("Error deserialing state!");
    }
    System.out.println(messageCount + " is the current messagecount.");
}

public void getState(OutputStream output) throws Exception {
    Util.objectToStream(messageCount, new DataOutputStream(output));
}

Далее мы переопределяем еще два метода в ReceiverAdaptor:

Подобно сообщениям, JGroups передает состояние в виде массива байтов.

JGroups предоставляет координатору InputStream для записи состояния и OutputStream для чтения новым узлом. API предоставляет удобные классы для сериализации и десериализации данных.

Обратите внимание, что в производственном коде доступ к информации о состоянии должен быть потокобезопасным.

channel.connect(clusterName);
channel.getState(null, 0);

Наконец, мы добавляем вызов getState() в наш запуск, после того как мы подключимся к кластеру:

getState() принимает пункт назначения, из которого запрашивается состояние, и время ожидания в миллисекундах. Нулевой пункт назначения указывает на координатора, а 0 означает отсутствие тайм-аута.

Когда мы запускаем это приложение с парой узлов и обмениваемся широковещательными сообщениями, мы видим увеличение количества сообщений.

Затем, если мы добавим третьего клиента или остановим и запустим один из них, мы увидим, что вновь подключенный узел печатает правильное количество сообщений.

8. Заключение

В этом уроке мы использовали JGroups для создания приложения для обмена сообщениями. Мы использовали API для отслеживания того, какие узлы подключались к кластеру и покидали его, а также для передачи состояния кластера новому узлу при его присоединении.