«1. Обзор
В этом руководстве мы увидим, как мы можем добавить обмен сообщениями MQTT в проект Java, используя библиотеки, предоставленные проектом Eclipse Paho.
2. MQTT Primer
MQTT (MQ Telemetry Transport) — это протокол обмена сообщениями, который был создан для удовлетворения потребности в простом и легком методе передачи данных на/с устройств с низким энергопотреблением, таких как те, которые используются в промышленных Приложения.
С ростом популярности устройств IoT (Интернет вещей) MQTT стал использоваться все шире, что привело к его стандартизации OASIS и ISO.
Протокол поддерживает единый шаблон обмена сообщениями, а именно шаблон публикации-подписки: каждое сообщение, отправляемое клиентом, содержит связанную «тему», которая используется брокером для маршрутизации его подписавшимся клиентам. Имена тем могут быть простыми строками, такими как «oiltemp» или строкой в виде пути «motor/1/rpm».
Чтобы получать сообщения, клиент подписывается на одну или несколько тем, используя ее точное название или строку, содержащую один из поддерживаемых подстановочных знаков («#» для многоуровневых тем и «+» для одноуровневый»).
3. Настройка проекта
Чтобы включить библиотеку Paho в проект Maven, мы должны добавить следующую зависимость:
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
Последнюю версию модуля библиотеки Eclipse Paho Java можно загрузить с Центральный Мейвен.
4. Настройка клиента
При использовании библиотеки Paho первое, что нам нужно сделать, чтобы отправлять и/или получать сообщения от брокера MQTT, — это получить реализацию интерфейса IMqttClient. Этот интерфейс содержит все методы, необходимые приложению для установления соединения с сервером, отправки и получения сообщений.
Paho поставляется с двумя реализациями этого интерфейса: асинхронной (MqttAsyncClient) и синхронной (MqttClient). В нашем случае мы сосредоточимся на синхронной версии, которая имеет более простую семантику.
Сама установка представляет собой двухэтапный процесс: сначала мы создаем экземпляр класса MqttClient, а затем подключаем его к нашему серверу. В следующем подразделе подробно описаны эти шаги.
4.1. Создание нового экземпляра IMqttClient
В следующем фрагменте кода показано, как создать новый синхронный экземпляр IMqttClient:
String publisherId = UUID.randomUUID().toString();
IMqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId);
В этом случае мы используем простейший доступный конструктор, который принимает адрес конечной точки нашего брокера MQTT. и идентификатор клиента, который однозначно идентифицирует нашего клиента.
В нашем случае мы использовали случайный UUID, поэтому при каждом запуске будет генерироваться новый идентификатор клиента.
Paho также предоставляет дополнительные конструкторы, которые мы можем использовать для настройки механизма сохраняемости, используемого для хранения неподтвержденных сообщений, и/или ScheduledExecutorService, используемого для запуска фоновых задач, требуемых реализацией механизма протокола.
Конечная точка сервера, которую мы используем, — это общедоступный брокер MQTT, размещенный в проекте Paho, который позволяет любому, у кого есть подключение к Интернету, тестировать клиентов без необходимости какой-либо аутентификации.
4.2. Подключение к серверу
Наш недавно созданный экземпляр MqttClient не подключен к серверу. Мы делаем это, вызывая его метод connect(), при необходимости передавая экземпляр MqttConnectOptions, который позволяет нам настраивать некоторые аспекты протокола.
В частности, мы можем использовать эти параметры для передачи дополнительной информации, такой как учетные данные безопасности, режим восстановления сеанса, режим повторного подключения и так далее.
Класс MqttConnectionOptions предоставляет эти параметры как простые свойства, которые мы можем установить с помощью обычных методов установки. Нам нужно только установить свойства, необходимые для нашего сценария — остальные примут значения по умолчанию.
Код, используемый для установления соединения с сервером, обычно выглядит следующим образом:
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
publisher.connect(options);
Здесь мы определяем наши параметры подключения, чтобы:
-
Библиотека автоматически попыталась повторно подключиться к серверу в случае сбоя сети Неотправленные сообщения из предыдущего запуска будут удалены. Время ожидания соединения установлено на 10 секунд
5. Отправка сообщений
«Отправка сообщений с помощью уже подключенного MqttClient очень проста. Мы используем один из вариантов метода publish() для отправки полезной нагрузки, которая всегда представляет собой массив байтов, в заданную тему, используя один из следующих параметров качества обслуживания:
-
0 — не более однажды» семантика, также известная как «выстрелил и забыл». Используйте эту опцию, когда потеря сообщения приемлема, так как она не требует какого-либо подтверждения или постоянства 1 — семантика «по крайней мере один раз». Используйте этот вариант, если потеря сообщений недопустима и ваши подписчики могут обрабатывать дубликаты 2 — семантика «ровно один раз». Используйте эту опцию, когда потеря сообщений неприемлема и ваши подписчики не могут обрабатывать дубликаты
В нашем примере проекта класс EngineTemperatureSensor играет роль фиктивного датчика, который выдает новое значение температуры каждый раз, когда мы вызываем его метод call().
Этот класс реализует интерфейс Callable, поэтому мы можем легко использовать его с одной из реализаций ExecutorService, доступных в пакете java.util.concurrent:
public class EngineTemperatureSensor implements Callable<Void> {
// ... private members omitted
public EngineTemperatureSensor(IMqttClient client) {
this.client = client;
}
@Override
public Void call() throws Exception {
if ( !client.isConnected()) {
return null;
}
MqttMessage msg = readEngineTemp();
msg.setQos(0);
msg.setRetained(true);
client.publish(TOPIC,msg);
return null;
}
private MqttMessage readEngineTemp() {
double temp = 80 + rnd.nextDouble() * 20.0;
byte[] payload = String.format("T:%04.2f",temp)
.getBytes();
return new MqttMessage(payload);
}
}
MqttMessage инкапсулирует саму полезную нагрузку, запрошенное качество Service, а также сохраненный флаг для сообщения. Этот флаг указывает брокеру, что он должен сохранять это сообщение до тех пор, пока оно не будет использовано подписчиком.
Мы можем использовать эту функцию для реализации «последнего известного хорошего» поведения, поэтому, когда новый подписчик подключается к серверу, он сразу же получает сохраненное сообщение.
6. Получение сообщений
Чтобы получать сообщения от MQTT-брокера, нам нужно использовать один из вариантов метода subscribe(), который позволяет указать:
-
Один или несколько тематических фильтров для сообщений, которые мы хотите получить Связанное QoS Обработчик обратного вызова для обработки полученных сообщений
В следующем примере мы покажем, как добавить прослушиватель сообщений к существующему экземпляру IMqttClient для получения сообщений из заданной темы. Мы используем CountDownLatch в качестве механизма синхронизации между нашим обратным вызовом и основным потоком выполнения, уменьшая его каждый раз, когда приходит новое сообщение.
В примере кода мы использовали другой экземпляр IMqttClient для получения сообщений. Мы сделали это просто для того, чтобы было понятнее, какой клиент что делает, но это не ограничение Paho — при желании вы можете использовать один и тот же клиент для публикации и получения сообщений:
CountDownLatch receivedSignal = new CountDownLatch(10);
subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> {
byte[] payload = msg.getPayload();
// ... payload handling omitted
receivedSignal.countDown();
});
receivedSignal.await(1, TimeUnit.MINUTES);
Вариант subscribe() используемый выше, принимает экземпляр IMqttMessageListener в качестве второго аргумента.
В нашем случае мы используем простую лямбда-функцию, которая обрабатывает полезную нагрузку и уменьшает значение счетчика. Если в указанное временное окно (1 минута) поступает недостаточно сообщений, метод await() выдает исключение.
При использовании Paho нам не нужно явно подтверждать получение сообщения. Если обратный вызов возвращается нормально, Paho считает его успешным и отправляет подтверждение на сервер.
Если обратный вызов выдает исключение, клиент будет закрыт. Обратите внимание, что это приведет к потере всех сообщений, отправленных с уровнем QoS 0.
Сообщения, отправленные с уровнем QoS 1 или 2, будут повторно отправлены сервером, как только клиент повторно подключится и снова подпишется на тему.
7. Заключение
В этой статье мы продемонстрировали, как мы можем добавить поддержку протокола MQTT в наши Java-приложения, используя библиотеку, предоставленную проектом Eclipse Paho.
Эта библиотека обрабатывает все низкоуровневые детали протокола, что позволяет нам сосредоточиться на других аспектах нашего решения, оставляя при этом достаточно места для настройки важных аспектов его внутренних функций, таких как сохраняемость сообщений.
Код, показанный в этой статье, доступен на GitHub.