«1. Обзор

Большинству распределенных приложений требуется, чтобы какой-либо компонент с отслеживанием состояния был непротиворечивым и отказоустойчивым. Atomix — это встраиваемая библиотека, помогающая добиться отказоустойчивости и согласованности для распределенных ресурсов.

Он предоставляет богатый набор API для управления своими ресурсами, такими как коллекции, группы и инструменты для параллелизма.

Для начала нам нужно добавить следующую зависимость Maven в наш pom:

<dependency>
    <groupId>io.atomix</groupId>
    <artifactId>atomix-all</artifactId>
    <version>1.0.8</version>
</dependency>

Эта зависимость предоставляет транспорт на основе Netty, необходимый его узлам для связи друг с другом.

2. Начальная загрузка кластера

Чтобы начать работу с Atomix, нам нужно сначала загрузить кластер.

Atomix состоит из набора реплик, которые используются для создания распределенных ресурсов с отслеживанием состояния. Каждая реплика поддерживает копию состояния каждого ресурса, существующего в кластере.

Реплики в кластере бывают двух типов: активные и пассивные.

Изменения состояния распределенных ресурсов распространяются через активные реплики, в то время как пассивные реплики синхронизируются для обеспечения отказоустойчивости.

2.1. Начальная загрузка встроенного кластера

Для начальной загрузки кластера с одним узлом нам нужно сначала создать экземпляр AtomixReplica:

AtomixReplica replica = AtomixReplica.builder(
  new Address("localhost", 8700))
   .withStorage(storage)
   .withTransport(new NettyTransport())
   .build();

Здесь реплика настроена с хранилищем и транспортом. Фрагмент кода для объявления хранилища:

Storage storage = Storage.builder()
  .withDirectory(new File("logs"))
  .withStorageLevel(StorageLevel.DISK)
  .build();

После того, как реплика объявлена ​​и настроена с хранилищем и транспортом, мы можем загрузить ее, просто вызвав bootstrap(), которая возвращает CompletableFuture, который можно использовать для блокировки, пока сервер загружается вызовом связанного блокирующего метода join():

CompletableFuture<AtomixReplica> future = replica.bootstrap();
future.join();

Пока мы создали кластер с одним узлом. Теперь мы можем добавить к нему больше узлов.

Для этого нам нужно создать другие реплики и соединить их с существующим кластером; нам нужно создать новый поток для вызова метода join(Address):

AtomixReplica replica2 = AtomixReplica.builder(
  new Address("localhost", 8701))
    .withStorage(storage)
    .withTransport(new NettyTransport())
    .build();
  
replica2
  .join(new Address("localhost", 8700))
  .join();

AtomixReplica replica3 = AtomixReplica.builder(
  new Address("localhost", 8702))
    .withStorage(storage)
    .withTransport(new NettyTransport())
    .build();

replica3.join(
  new Address("localhost", 8700), 
  new Address("localhost", 8701))
  .join();

Теперь у нас есть загруженный кластер из трех узлов. В качестве альтернативы мы можем запустить кластер, передав список адресов в метод начальной загрузки (List\u003cAddress\u003e):

List<Address> cluster = Arrays.asList(
  new Address("localhost", 8700), 
  new Address("localhost", 8701), 
  new Address("localhsot", 8702));

AtomixReplica replica1 = AtomixReplica
  .builder(cluster.get(0))
  .build();
replica1.bootstrap(cluster).join();

AtomixReplica replica2 = AtomixReplica
  .builder(cluster.get(1))
  .build();
            
replica2.bootstrap(cluster).join();

AtomixReplica replica3 = AtomixReplica
  .builder(cluster.get(2))
  .build();

replica3.bootstrap(cluster).join();

Нам нужно создать новый поток для каждой реплики.

2.2. Начальная загрузка автономного кластера

Сервер Atomix можно запустить как автономный сервер, который можно загрузить с Maven Central. Проще говоря — это Java-архив, который можно запустить через терминал, указав

Проще говоря — это Java-архив, который можно запустить через терминал, указав параметр host: port во флаге адреса и используя -бутстрап флаг.

Вот команда для начальной загрузки кластера:

java -jar atomix-standalone-server.jar 
  -address 127.0.0.1:8700 -bootstrap -config atomix.properties

Здесь atomix.properties — это файл конфигурации для настройки хранилища и транспорта. Чтобы создать многоузловой кластер, мы можем добавить узлы в существующий кластер, используя флаг -join.

Формат для этого:

java -jar atomix-standalone-server.jar 
  -address 127.0.0.1:8701 -join 127.0.0.1:8700

3. Работа с клиентом

Atomix поддерживает создание клиента для удаленного доступа к своему кластеру через AtomixClient API.

Так как клиенты не должны сохранять состояние, AtomixClient не имеет никакого хранилища. Нам просто нужно настроить транспорт при создании клиента, так как транспорт будет использоваться для связи с кластером.

Давайте создадим клиент с транспортом:

AtomixClient client = AtomixClient.builder()
  .withTransport(new NettyTransport())
  .build();

Теперь нам нужно подключить клиента к кластеру.

Мы можем объявить список адресов и передать его в качестве аргумента методу connect() клиента:

client.connect(cluster)
  .thenRun(() -> {
      System.out.println("Client is connected to the cluster!");
  });

4. Обработка ресурсов

Истинная сила Atomix заключается в его мощном наборе API для создания распределенных ресурсов и управления ими. Ресурсы реплицируются и сохраняются в кластере, а также поддерживаются реплицированным конечным автоматом, который управляется базовой реализацией Raft Consensus Protocol.

Распределенные ресурсы могут создаваться и управляться одним из методов get(). Мы можем создать экземпляр распределенного ресурса из AtomixReplica.

Учитывая, что реплика является экземпляром AtomixReplica, фрагмент кода для создания ресурса распределенной карты и установки для него значения:

replica.getMap("map")
  .thenCompose(m -> m.put("bar", "Hello world!"))
  .thenRun(() -> System.out.println("Value is set in Distributed Map"))
  .join();

Здесь метод join() будет блокировать программу до тех пор, пока ресурс не будет создан и значение не будет установить на это. Мы можем получить тот же объект с помощью AtomixClient и получить значение с помощью метода get(\»bar\»).

«Мы можем использовать метод get() в конце, чтобы дождаться результата:

String value = client.getMap("map"))
  .thenCompose(m -> m.get("bar"))
  .thenApply(a -> (String) a)
  .get();

5. Непротиворечивость и отказоустойчивость

Atomix используется для критически важных небольших наборов данных, для которых согласованность намного выше. беспокойство, чем доступность.

Он обеспечивает строгую настраиваемую согласованность за счет линеаризации как для чтения, так и для записи. В линеаризуемости после фиксации записи все клиенты гарантированно будут осведомлены о результирующем состоянии.

Согласованность в кластере Atomix гарантируется базовым алгоритмом консенсуса Raft, где избранный лидер будет иметь все записи, которые ранее были успешными.

Все новые записи будут проходить через лидера кластера и синхронно реплицироваться на большую часть сервера перед завершением.

Для обеспечения отказоустойчивости большинство серверов кластера должны быть в рабочем состоянии. Если выйдет из строя меньшинство узлов, узлы будут помечены как неактивные и будут заменены пассивными узлами или резервными узлами.

В случае сбоя лидера оставшиеся серверы в кластере начнут новые выборы лидера. При этом кластер будет недоступен.

В случае раздела, если лидер находится на стороне разделения, не имеющей кворума, он уходит в отставку, и новый лидер избирается на стороне с кворумом.

И, если лидер на стороне большинства, то так и будет продолжаться без изменений. Когда разделение будет разрешено, узлы на стороне, не входящей в кворум, присоединятся к кворуму и соответствующим образом обновят свой журнал.

6. Заключение

Как и ZooKeeper, Atomix предоставляет надежный набор библиотек для решения задач распределенных вычислений.

И, как всегда, полный исходный код для этой задачи доступен на GitHub.