«1. Введение

Apache Pulsar — ​​это распределенная система обмена сообщениями с открытым исходным кодом, основанная на публикации/подписке, разработанная в Yahoo.

Он был создан для поддержки критически важных приложений Yahoo, таких как Yahoo Mail, Yahoo Finance, Yahoo Sports и т. д. Затем, в 2016 году, он был открыт в рамках Apache Software Foundation.

2. Архитектура

Pulsar — ​​это многопользовательское высокопроизводительное решение для обмена сообщениями между серверами. Он состоит из набора брокеров и букмекеров, а также встроенного Apache ZooKeeper для настройки и управления. Букмекеры от Apache BookKeeper, которые обеспечивают хранение сообщений до тех пор, пока они не будут использованы.

В кластере у нас будет:

    Несколько брокеров кластера для обработки входящих сообщений от производителей и отправки сообщений потребителям Apache BookKeeper для поддержки сохраняемости сообщений Apache ZooKeeper для хранения конфигурации кластера

Чтобы лучше понять это , давайте посмотрим на схему архитектуры из документации:

3. Основные возможности

Давайте начнем с краткого обзора некоторых ключевых функций:

    Встроенная поддержка нескольких кластеров Поддержка георепликации сообщения в нескольких кластерах Несколько режимов подписки Масштабируемость до миллионов тем Использует Apache BookKeeper для гарантии доставки сообщений. Низкая задержка

Теперь давайте подробно обсудим некоторые ключевые функции.

3.1. Модель обмена сообщениями

Платформа предоставляет гибкую модель обмена сообщениями. В общих архитектурах обмена сообщениями есть две модели обмена сообщениями, т. е. организация очереди и издатель/подписчик. Издатель/подписчик — это широковещательная система обмена сообщениями, в которой сообщение отправляется всем потребителям. С другой стороны, очередь — это двухточечная связь.

Pulsar объединяет обе концепции в одном обобщенном API. Издатель публикует сообщения в разные темы. Затем эти сообщения транслируются на все подписки.

Потребители подписываются, чтобы получать сообщения. Библиотека позволяет потребителям выбирать различные способы использования сообщений в одной и той же подписке, включая эксклюзивный, общий и отказоустойчивый. Мы подробно обсудим эти типы подписки в последующих разделах.

3.2. Режимы развертывания

Pulsar имеет встроенную поддержку развертывания в различных средах. Это означает, что мы можем использовать его на стандартных локальных компьютерах или развернуть в кластере Kubernetes, Google или AWS Cloud.

Он может быть выполнен как отдельный узел для целей разработки и тестирования. В этом случае все компоненты (брокер, BookKeeper и ZooKeeper) выполняются в одном процессе.

3.3. Георепликация

Библиотека предоставляет готовую поддержку георепликации данных. Мы можем включить репликацию сообщений между несколькими кластерами, настроив разные географические регионы.

Данные сообщения реплицируются почти в реальном времени. В случае сбоя сети в кластерах данные всегда в безопасности и сохраняются в BookKeeper. Система репликации продолжает повторять попытки до тех пор, пока репликация не завершится успешно.

Функция георепликации также позволяет организации развертывать Pulsar в разных облачных провайдерах и реплицировать данные. Это помогает им избежать использования проприетарных API-интерфейсов облачных провайдеров.

3.4. Постоянство

После того, как Pulsar прочитает и подтвердит данные, он гарантирует отсутствие потери данных. Долговечность данных связана с количеством дисков, настроенных для хранения данных.

Pulsar обеспечивает надежность, используя букмекерские конторы (экземпляр Apache BookKeeper), работающие в узлах хранения. Всякий раз, когда букмекер получает сообщение, он сохраняет копию в памяти, а также записывает данные в WAL (Write Ahead Log). Этот журнал работает так же, как база данных WAL. Букмекеры работают по принципу транзакций базы данных и гарантируют, что данные не будут потеряны даже в случае сбоя машины.

«Помимо вышеперечисленного, Pulsar также может выдерживать множественные сбои узлов. Библиотека реплицирует данные нескольким букмекерам, а затем отправляет сообщение с подтверждением производителю. Этот механизм гарантирует нулевую потерю данных даже в случае множественных отказов оборудования.

4. Настройка одного узла

Теперь давайте посмотрим, как настроить кластер Apache Pulsar с одним узлом.

Apache также предоставляет простой клиентский API с привязками для Java, Python и C++. Позже мы создадим простой пример производителя и подписки Java.

4.1. Установка

Apache Pulsar доступен в виде бинарного дистрибутива. Давайте начнем с его загрузки:

wget https://archive.apache.org/dist/incubator/pulsar/pulsar-2.1.1-incubating/apache-pulsar-2.1.1-incubating-bin.tar.gz

Когда загрузка будет завершена, мы можем разархивировать zip-файл. Разархивированная раздача будет содержать папку bin, conf, example, licenses и lib.

После этого нам нужно скачать встроенные коннекторы. Теперь они поставляются в виде отдельного пакета:

wget https://archive.apache.org/dist/incubator/pulsar/pulsar-2.1.1-incubating/apache-pulsar-io-connectors-2.1.1-incubating-bin.tar.gz

Давайте разархивируем соединители и скопируем папку Connectors в папку Pulsar.

4.2. Запуск экземпляра

Чтобы запустить автономный экземпляр, мы можем выполнить:

bin/pulsar standalone

5. Java-клиент

Теперь мы создадим проект Java для создания и использования сообщений. Мы также создадим примеры для разных типов подписки.

5.1. Настройка проекта

Мы начнем с добавления в наш проект зависимости pulsar-client:

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>2.1.1-incubating</version>
</dependency>

5.2. Producer

Давайте продолжим, создав пример Producer. Здесь мы создадим тему и производителя.

Во-первых, нам нужно создать PulsarClient, который будет подключаться к сервису Pulsar на определенном хосте и порту, используя собственный протокол. Многие производители и потребители могут совместно использовать один клиентский объект.

Теперь мы создадим производителя с определенным названием темы:

private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "test-topic";
PulsarClient client = PulsarClient.builder()
  .serviceUrl(SERVICE_URL)
  .build();

Producer<byte[]> producer = client.newProducer()
  .topic(TOPIC_NAME)
  .compressionType(CompressionType.LZ4)
  .create();

IntStream.range(1, 5).forEach(i -> {
    String content = String.format("hi-pulsar-%d", i);

    Message<byte[]> msg = MessageBuilder.create()
      .setContent(content.getBytes())
      .build();
    MessageId msgId = producer.send(msg);
});

Производитель отправит 5 сообщений:

5.3. Потребитель

Consumer<byte[]> consumer = client.newConsumer()
  .topic(TOPIC_NAME)
  .subscriptionType(SubscriptionType.Shared)
  .subscriptionName(SUBSCRIPTION_NAME)
  .subscribe();

Далее мы создадим потребителя, чтобы получать сообщения, созданные производителем. Потребителю также требуется тот же PulsarClient для подключения к нашему серверу:

Здесь мы создали клиент с типом подписки Shared. Это позволяет нескольким потребителям подключаться к одной и той же подписке и получать сообщения.

5.4. Типы подписки для потребителя

В приведенном выше примере потребителя мы создали подписку с общим типом. Мы также можем создавать эксклюзивные и отказоустойчивые подписки.

Эксклюзивная подписка позволяет подписаться только одному потребителю.

С другой стороны, отказоустойчивая подписка позволяет пользователю определить резервного потребителя на случай, если один потребитель выйдет из строя, как показано на этой диаграмме Apache:

6. Заключение

В этой статье мы выделили функции системы обмена сообщениями Pulsar, такие как модель обмена сообщениями, георепликация и надежные гарантии надежности.

Мы также узнали, как настроить отдельный узел и как использовать Java-клиент.