«1. Обзор

В этой статье мы представим BookKeeper — сервис, реализующий распределенную отказоустойчивую систему хранения записей.

2. Что такое BookKeeper?

BookKeeper был первоначально разработан Yahoo как подпроект ZooKeeper и стал проектом высшего уровня в 2015 году. По своей сути BookKeeper стремится быть надежной и высокопроизводительной системой, которая хранит последовательности записей журнала (также известных как записи). в структурах данных, называемых регистрами.

Важной особенностью леджеров является то, что они неизменяемы и предназначены только для добавления. Это делает BookKeeper хорошим кандидатом для определенных приложений, таких как системы распределенного ведения журналов, приложения для обмена сообщениями Pub-Sub и потоковая обработка в реальном времени.

3. Основные понятия бухгалтера

3.1. Записи журнала

Запись журнала содержит неделимую единицу данных, которую клиентское приложение сохраняет или считывает из BookKeeper. При сохранении в реестре каждая запись содержит предоставленные данные и несколько полей метаданных.

Эти поля метаданных включают в себя entryId, который должен быть уникальным в рамках данной книги. Существует также код аутентификации, который BookKeeper использует для обнаружения повреждения или подделки записи.

BookKeeper сам по себе не предлагает функции сериализации, поэтому клиенты должны разработать собственный метод для преобразования конструкций более высокого уровня в массивы байтов и из них.

3.2. Книги

Книги — это основная единица хранения, управляемая BookKeeper, в которой хранится упорядоченная последовательность записей журнала. Как упоминалось ранее, реестры имеют семантику только добавления, что означает, что записи не могут быть изменены после добавления в них.

Кроме того, как только клиент прекращает запись в реестр и закрывает его, BookKeeper запечатывает его, и мы больше не можем добавлять в него данные даже позднее. Это важный момент, о котором следует помнить при разработке приложения для BookKeeper. Реестры не являются хорошим кандидатом для прямой реализации высокоуровневых конструкций, таких как очередь. Вместо этого мы видим, что реестры чаще используются для создания более базовых структур данных, которые поддерживают эти концепции более высокого уровня.

Например, проект распределенного журнала Apache использует регистры в качестве сегментов журнала. Эти сегменты объединяются в распределенные журналы, но лежащие в их основе реестры прозрачны для обычных пользователей.

BookKeeper обеспечивает устойчивость реестра за счет репликации записей журнала на нескольких экземплярах сервера. Три параметра управляют количеством сохраняемых серверов и копий:

    Размер ансамбля: количество серверов, используемых для записи данных реестра. Размер кворума записи: количество серверов, используемых для репликации данной записи журнала. Размер кворума подтверждения: количество серверов, которые должен подтвердить данную операцию записи в журнал

Регулируя эти параметры, мы можем настроить характеристики производительности и устойчивости данного реестра. При записи в реестр BookKeeper будет считать операцию успешной только тогда, когда ее подтвердит минимальный кворум членов кластера.

В дополнение к своим внутренним метаданным BookKeeper также поддерживает добавление пользовательских метаданных в бухгалтерскую книгу. Это карта пар ключ/значение, которую клиенты передают во время создания, и BookKeeper сохраняет в ZooKeeper вместе со своей собственной.

3.3. Букмекерские конторы

Букмекерские конторы — это серверы, на которых хранятся один или несколько режимов реестра. Кластер BookKeeper состоит из нескольких букмекерских контор, работающих в заданной среде и предоставляющих услуги клиентам через обычные соединения TCP или TLS.

Букмекеры координируют действия с помощью кластерных сервисов, предоставляемых ZooKeeper. Это означает, что если мы хотим получить полностью отказоустойчивую систему, нам потребуется как минимум 3 экземпляра ZooKeeper и 3 экземпляра BookKeeper. Такая установка сможет выдержать потерю в случае сбоя любого отдельного экземпляра и по-прежнему сможет нормально работать, по крайней мере, для настройки реестра по умолчанию: размер ансамбля из 3 узлов, кворум записи из 2 узлов и кворум подтверждения из 2 узлов.

4. Локальная настройка

«Основные требования для локального запуска BookKeeper довольно скромные. Во-первых, нам нужен запущенный и работающий экземпляр ZooKeeper, который обеспечивает хранилище метаданных бухгалтерской книги для BookKeeper. Далее мы развертываем букмекерскую контору, которая оказывает клиентам реальные услуги.

Хотя эти шаги, безусловно, можно выполнить вручную, здесь мы будем использовать файл docker-compose, который использует официальные образы Apache, чтобы упростить эту задачу:

$ cd <path to docker-compose.yml>
$ docker-compose up

Этот docker-compose создает три букмекера и экземпляр ZooKeeper. . Поскольку все букмекерские конторы работают на одной машине, это полезно только для целей тестирования. Официальная документация содержит необходимые шаги для настройки полностью отказоустойчивого кластера.

Давайте выполним базовый тест, чтобы убедиться, что он работает должным образом, используя команду listbookies оболочки бухгалтерского учета:

$ docker exec -it apache-bookkeeper_bookie_1 /opt/bookkeeper/bin/bookkeeper \
  shell listbookies -readwrite
ReadWrite Bookies :
192.168.99.101(192.168.99.101):4181
192.168.99.101(192.168.99.101):4182
192.168.99.101(192.168.99.101):3181

Вывод показывает список доступных букмекеров, состоящий из трех букмекеров. Обратите внимание, что отображаемые IP-адреса будут меняться в зависимости от особенностей локальной установки Docker.

5. Использование Ledger API

Ledger API — это самый простой способ взаимодействия с BookKeeper. Он позволяет нам напрямую взаимодействовать с объектами Ledger, но, с другой стороны, не поддерживает прямую поддержку абстракций более высокого уровня, таких как потоки. Для этих случаев проект BookKeeper предлагает другую библиотеку, DistributedLog, которая поддерживает эти функции.

Использование Ledger API требует добавления в наш проект зависимости bookkeeper-server:

<dependency>
    <groupId>org.apache.bookkeeper</groupId>
    <artifactId>bookkeeper-server</artifactId>
    <version>4.10.0</version>
</dependency>

ПРИМЕЧАНИЕ. Как указано в документации, использование этой зависимости также будет включать зависимости для библиотек protobuf и guava. Если нашему проекту также потребуются эти библиотеки, но в версии, отличной от той, что используется BookKeeper, мы могли бы использовать альтернативную зависимость, которая затеняет эти библиотеки:

<dependency>
    <groupId>org.apache.bookkeeper</groupId>
    <artifactId>bookkeeper-server-shaded</artifactId>
    <version>4.10.0</version>
</dependency>

5.1. Подключение к букмекерам

Класс BookKeeper — это основная точка входа в Ledger API, предоставляющая несколько методов для подключения к нашему сервису BookKeeper. В самой простой форме все, что нам нужно сделать, это создать новый экземпляр этого класса, передав адрес одного из серверов ZooKeeper, используемых BookKeeper:

BookKeeper client = new BookKeeper("zookeeper-host:2131");

Здесь zookeeper-host должен быть установлен на IP-адрес или имя хоста сервера ZooKeeper, на котором хранится конфигурация кластера BookKeeper. В нашем случае это обычно «localhost» или хост, на который указывает переменная среды DOCKER_HOST.

Если нам нужно больше контроля над несколькими параметрами, доступными для тонкой настройки нашего клиента, мы можем использовать экземпляр ClientConfiguration и использовать его для создания нашего клиента:

ClientConfiguration cfg = new ClientConfiguration();
cfg.setMetadataServiceUri("zk+null://zookeeper-host:2131");

// ... set other properties
 
BookKeeper.forConfig(cfg).build();

5.2. Создание реестра

Когда у нас есть экземпляр BookKeeper, создание нового реестра становится простым:

LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.MAC,"password".getBytes());

Здесь мы использовали самый простой вариант этого метода. Он создаст новый реестр с настройками по умолчанию, используя тип дайджеста MAC для обеспечения целостности записи.

Если мы хотим добавить пользовательские метаданные в нашу книгу, нам нужно использовать вариант, который принимает все параметры:

LedgerHandle lh = bk.createLedger(
  3,
  2,
  2,
  DigestType.MAC,
  "password".getBytes(),
  Collections.singletonMap("name", "my-ledger".getBytes()));

На этот раз мы использовали полную версию метода createLedger(). Три первых аргумента — это размер ансамбля, значения кворума записи и кворума подтверждения соответственно. Далее у нас те же параметры дайджеста, что и раньше. Наконец, мы передаем карту с нашими пользовательскими метаданными.

В обоих вышеприведенных случаях createLedger является синхронной операцией. BookKeeper также предлагает асинхронное создание реестра с помощью обратного вызова:

bk.asyncCreateLedger(
  3,
  2,
  2,
  BookKeeper.DigestType.MAC, "passwd".getBytes(),
  (rc, lh, ctx) -> {
      // ... use lh to access ledger operations
  },
  null,
  Collections.emptyMap());

Более новые версии BookKeeper (\u003e= 4.6) также поддерживают API в стиле Fluent и CompletableFuture для достижения той же цели:

CompletableFuture<WriteHandle> cf = bk.newCreateLedgerOp()
  .withDigestType(org.apache.bookkeeper.client.api.DigestType.MAC)
  .withPassword("password".getBytes())
  .execute();

Обратите внимание, что в в этом случае мы получаем WriteHandle вместо LedgerHandle. Как мы увидим позже, мы можем использовать любой из них для доступа к нашей книге, поскольку LedgerHandle реализует WriteHandle.

5.3. Запись данных

Как только мы получили LedgerHandle или WriteHandle, мы записываем данные в связанный реестр, используя один из вариантов метода append(). Начнем с синхронного варианта:

for(int i = 0; i < MAX_MESSAGES; i++) {
    byte[] data = new String("message-" + i).getBytes();
    lh.append(data);
}

Здесь мы используем вариант, который принимает массив байтов. API также поддерживает ByteBuf от Netty и ByteBuffer от Java NIO, которые позволяют лучше управлять памятью в критичных по времени сценариях.

«Для асинхронных операций API немного отличается в зависимости от конкретного типа дескриптора, который мы получили. WriteHandle использует CompletableFuture, тогда как LedgerHandle также поддерживает методы на основе обратного вызова:

// Available in WriteHandle and LedgerHandle
CompletableFuture<Long> f = lh.appendAsync(data);

// Available only in LedgerHandle
lh.asyncAddEntry(
  data,
  (rc,ledgerHandle,entryId,ctx) -> {
      // ... callback logic omitted
  },
  null);

Какой из них выбрать, в значительной степени зависит от личного выбора, но в целом использование API на основе CompletableFuture, как правило, легче для чтения. Кроме того, есть дополнительное преимущество, заключающееся в том, что мы можем создать Mono непосредственно из него, что упрощает интеграцию BookKeeper в реактивные приложения.

5.4. Чтение данных

Чтение данных из бухгалтерской книги BookKeeper аналогично записи. Во-первых, мы используем наш экземпляр BookKeeper для создания LedgerHandle:

LedgerHandle lh = bk.openLedger(
  ledgerId, 
  BookKeeper.DigestType.MAC,
  ledgerPassword);

За исключением параметра LedgerId, который мы рассмотрим позже, этот код очень похож на метод createLedger(), который мы видели раньше. Однако есть важное отличие; этот метод возвращает экземпляр LedgerHandle, доступный только для чтения. Если мы попытаемся использовать любой из доступных методов append(), все, что мы получим, — это исключение.

В качестве альтернативы, более безопасным способом является использование API в стиле Fluent:

ReadHandle rh = bk.newOpenLedgerOp()
  .withLedgerId(ledgerId)
  .withDigestType(DigestType.MAC)
  .withPassword("password".getBytes())
  .execute()
  .get();

ReadHandle имеет необходимые методы для чтения данных из нашей книги:

long lastId = lh.readLastConfirmed();
rh.read(0, lastId).forEach((entry) -> {
    // ... do something 
});

Здесь мы просто запросили все доступные данных в этой книге с использованием варианта синхронного чтения. Как и ожидалось, есть также асинхронный вариант:

rh.readAsync(0, lastId).thenAccept((entries) -> {
    entries.forEach((entry) -> {
        // ... process entry
    });
});

Если мы решим использовать старый метод openLedger(), мы найдем дополнительные методы, поддерживающие стиль обратного вызова для асинхронных методов:

lh.asyncReadEntries(
  0,
  lastId,
  (rc,lh,entries,ctx) -> {
      while(entries.hasMoreElements()) {
          LedgerEntry e = ee.nextElement();
      }
  },
  null);

5.5. Вывод реестров

Ранее мы видели, что нам нужен идентификатор реестра, чтобы открыть и прочитать его данные. Итак, как нам его получить? Один из способов — использовать интерфейс LedgerManager, к которому мы можем получить доступ из нашего экземпляра BookKeeper. Этот интерфейс в основном работает с метаданными леджера, но также имеет метод asyncProcessLedgers(). Используя этот метод — и некоторую помощь в создании параллельных примитивов — мы можем перечислить все доступные регистры:

public List listAllLedgers(BookKeeper bk) {
    List ledgers = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch processDone = new CountDownLatch(1);

    bk.getLedgerManager()
      .asyncProcessLedgers(
        (ledgerId, cb) -> {
            ledgers.add(ledgerId);
            cb.processResult(BKException.Code.OK, null, null);
        }, 
        (rc, s, obj) -> {
            processDone.countDown();
        },
        null,
        BKException.Code.OK,
        BKException.Code.ReadException);
 
    try {
        processDone.await(1, TimeUnit.MINUTES);
        return ledgers;
    } catch (InterruptedException ie) {
        throw new RuntimeException(ie);
    }
}

Давайте проанализируем этот код, который немного длиннее, чем ожидалось, для кажущейся тривиальной задачи. Метод asyncProcessLedgers() требует двух обратных вызовов.

Первый собирает все идентификаторы леджеров в список. Здесь мы используем синхронизированный список, потому что этот обратный вызов может быть вызван из нескольких потоков. Помимо идентификатора книги, этот обратный вызов также получает параметр обратного вызова. Мы должны вызвать его метод processResult(), чтобы подтвердить, что мы обработали данные, и сообщить, что мы готовы получить больше данных.

Второй обратный вызов вызывается, когда все регистры были отправлены обратному вызову процессора или при сбое. В нашем случае мы опустили обработку ошибок. Вместо этого мы просто уменьшаем значение CountDownLatch, что, в свою очередь, завершает операцию ожидания и позволяет методу вернуться со списком всех доступных реестров.

6. Заключение

В этой статье мы рассмотрели проект Apache BookKeeper, рассмотрели его основные концепции и использовали его низкоуровневый API для доступа к реестрам и выполнения операций чтения/записи.

Как обычно, весь код доступен на GitHub.