«1. Обзор
JGroups — это Java API для надежного обмена сообщениями. Он имеет простой интерфейс, который обеспечивает:
-
гибкий стек протоколов, включая фрагментацию TCP и UDP и повторную сборку больших сообщений надежное управление потоком обнаружения отказов одноадресной и многоадресной рассылки
А также многие другие функции.
В этом руководстве мы создадим простое приложение для обмена строковыми сообщениями между приложениями и предоставления общего состояния новым приложениям по мере их подключения к сети.
2. Настройка
2.1. Зависимость Maven
Нам нужно добавить одну зависимость в наш pom.xml:
<dependency>
<groupId>org.jgroups</groupId>
<artifactId>jgroups</artifactId>
<version>4.0.10.Final</version>
</dependency>
Последнюю версию библиотеки можно проверить на Maven Central.
2.2. Сеть
JGroups попытается использовать IPV6 по умолчанию. В зависимости от конфигурации нашей системы это может привести к тому, что приложения не смогут обмениваться данными.
Чтобы избежать этого, мы установим для свойства java.net.preferIPv4Stack значение true при запуске наших приложений здесь:
java -Djava.net.preferIPv4Stack=true com.baeldung.jgroups.JGroupsMessenger
3. JChannels
Наше соединение с сетью JGroups — это JChannel. Канал присоединяется к кластеру и отправляет и получает сообщения, а также информацию о состоянии сети.
3.1. Создание канала
Мы создаем JChannel с путем к файлу конфигурации. Если мы опустим имя файла, он будет искать udp.xml в текущем рабочем каталоге.
Мы создадим канал с конфигурационным файлом с явным именем:
JChannel channel = new JChannel("src/main/resources/udp.xml");
Конфигурация JGroups может быть очень сложной, но стандартных конфигураций UDP и TCP достаточно для большинства приложений. Мы включили файл для UDP в наш код и будем использовать его в этом руководстве.
Для получения дополнительной информации о настройке транспорта см. руководство JGroups здесь.
3.2. Подключение канала
После того, как мы создали наш канал, нам нужно присоединиться к кластеру. Кластер — это группа узлов, которые обмениваются сообщениями.
Для присоединения к кластеру требуется имя кластера:
channel.connect("Baeldung");
Первый узел, который попытается присоединиться к кластеру, создаст его, если он не существует. Ниже мы увидим этот процесс в действии.
3.3. Именование канала
Узлы идентифицируются по имени, поэтому одноранговые узлы могут отправлять направленные сообщения и получать уведомления о том, кто входит и выходит из кластера. JGroups назначит имя автоматически, или мы можем установить свое собственное:
channel.name("user1");
Мы будем использовать эти имена ниже, чтобы отслеживать, когда узлы входят в кластер и покидают его.
3.4. Закрытие канала
Очистка канала необходима, если мы хотим, чтобы одноранговые узлы получали своевременное уведомление о выходе.
Мы закрываем JChannel с помощью его метода close:
channel.close()
4. Изменения в представлении кластера
После создания JChannel мы готовы видеть состояние одноранговых узлов в кластере и обмениваться с ними сообщениями.
JGroups поддерживает состояние кластера внутри класса View. Каждый канал имеет один вид сети. Когда представление изменяется, оно доставляется с помощью обратного вызова viewAccepted().
В этом руководстве мы расширим API-класс ReceiverAdaptor, который реализует все методы интерфейса, необходимые для приложения.
Это рекомендуемый способ реализации обратных вызовов.
Давайте добавим viewAccepted в наше приложение:
public void viewAccepted(View newView) {
private View lastView;
if (lastView == null) {
System.out.println("Received initial view:");
newView.forEach(System.out::println);
} else {
System.out.println("Received new view.");
List<Address> newMembers = View.newMembers(lastView, newView);
System.out.println("New members: ");
newMembers.forEach(System.out::println);
List<Address> exMembers = View.leftMembers(lastView, newView);
System.out.println("Exited members:");
exMembers.forEach(System.out::println);
}
lastView = newView;
}
Каждое представление содержит список объектов Address, представляющих каждого члена кластера. JGroups предлагает удобные методы для сравнения одного представления с другим, которые мы используем для обнаружения новых или закрытых членов кластера.
5. Отправка сообщений
Обработка сообщений в JGroups проста. Сообщение содержит массив байтов и объекты Address, соответствующие отправителю и получателю.
В этом руководстве мы используем строки, считываемые из командной строки, но легко увидеть, как приложение может обмениваться данными других типов.
5.1. Широковещательные сообщения
Сообщение создается с назначением и массивом байтов; JChannel устанавливает для нас отправителя. Если цель равна null, весь кластер получит сообщение.
Примем текст из командной строки и отправим его в кластер:
System.out.print("Enter a message: ");
String line = in.readLine().toLowerCase();
Message message = new Message(null, line.getBytes());
channel.send(message);
«
«Если мы запустим несколько экземпляров нашей программы и отправим это сообщение (после того, как мы реализуем метод receive() ниже), все они получат его, включая отправителя.
5.2. Блокировка наших сообщений
channel.setDiscardOwnMessages(true);
Если мы не хотим видеть наши сообщения, мы можем установить для этого свойство:
Когда мы запускаем предыдущий тест, отправитель сообщения не получает свое широковещательное сообщение.
5.3. Прямые сообщения
Для отправки прямого сообщения требуется действительный адрес. Если мы обращаемся к узлам по имени, нам нужен способ поиска адреса. К счастью, у нас есть View для этого.
private Optional<address> getAddress(String name) {
View view = channel.view();
return view.getMembers().stream()
.filter(address -> name.equals(address.toString()))
.findAny();
}
Текущее представление всегда доступно из JChannel:
Имена адресов доступны через метод класса toString(), поэтому мы просто ищем нужное имя в списке членов кластера.
Address destination = null;
System.out.print("Enter a destination: ");
String destinationName = in.readLine().toLowerCase();
destination = getAddress(destinationName)
.orElseThrow(() -> new Exception("Destination not found");
Message message = new Message(destination, "Hi there!");
channel.send(message);
Итак, мы можем принять имя из консоли, найти связанный пункт назначения и отправить прямое сообщение:
6. Получение сообщений
Мы можем отправлять сообщения, теперь давайте добавим попытку их получения в настоящее время.
public void receive(Message message) {
String line = Message received from: "
+ message.getSrc()
+ " to: " + message.getDest()
+ " -> " + message.getObject();
System.out.println(line);
}
Давайте переопределим пустой метод получения ReceiverAdaptor:
Поскольку мы знаем, что сообщение содержит строку, мы можем безопасно передать getObject() в System.out.
7. Обмен состояниями
Когда узел входит в сеть, ему может потребоваться получить информацию о состоянии кластера. JGroups предоставляет для этого механизм передачи состояния.
Когда узел присоединяется к кластеру, он просто вызывает getState(). Кластер обычно получает состояние от самого старшего члена группы — координатора.
private Integer messageCount = 0;
public void receive(Message message) {
String line = "Message received from: "
+ message.getSrc()
+ " to: " + message.getDest()
+ " -> " + message.getObject();
System.out.println(line);
if (message.getDest() == null) {
messageCount++;
System.out.println("Message count: " + messageCount);
}
}
Давайте добавим счетчик широковещательных сообщений в наше приложение. Мы добавим новую переменную-член и увеличим ее внутри Receive():
Мы проверяем нулевой пункт назначения, потому что, если мы подсчитываем прямые сообщения, каждый узел будет иметь разный номер.
public void setState(InputStream input) {
try {
messageCount = Util.objectFromStream(new DataInputStream(input));
} catch (Exception e) {
System.out.println("Error deserialing state!");
}
System.out.println(messageCount + " is the current messagecount.");
}
public void getState(OutputStream output) throws Exception {
Util.objectToStream(messageCount, new DataOutputStream(output));
}
Далее мы переопределяем еще два метода в ReceiverAdaptor:
Подобно сообщениям, JGroups передает состояние в виде массива байтов.
JGroups предоставляет координатору InputStream для записи состояния и OutputStream для чтения новым узлом. API предоставляет удобные классы для сериализации и десериализации данных.
Обратите внимание, что в производственном коде доступ к информации о состоянии должен быть потокобезопасным.
channel.connect(clusterName);
channel.getState(null, 0);
Наконец, мы добавляем вызов getState() в наш запуск, после того как мы подключимся к кластеру:
getState() принимает пункт назначения, из которого запрашивается состояние, и время ожидания в миллисекундах. Нулевой пункт назначения указывает на координатора, а 0 означает отсутствие тайм-аута.
Когда мы запускаем это приложение с парой узлов и обмениваемся широковещательными сообщениями, мы видим увеличение количества сообщений.
Затем, если мы добавим третьего клиента или остановим и запустим один из них, мы увидим, что вновь подключенный узел печатает правильное количество сообщений.
8. Заключение
В этом уроке мы использовали JGroups для создания приложения для обмена сообщениями. Мы использовали API для отслеживания того, какие узлы подключались к кластеру и покидали его, а также для передачи состояния кластера новому узлу при его присоединении.