«1. Обзор

В этой статье мы рассмотрим бесконфликтные реплицированные типы данных (CRDT) и способы работы с ними в Java. Для наших примеров мы будем использовать реализации из библиотеки wurmloch-crdt.

Когда у нас есть кластер из N узлов-реплик в распределенной системе, мы можем столкнуться с сетевым разделом — некоторые узлы временно не могут общаться друг с другом. Эта ситуация называется разделенным мозгом.

Когда у нас в системе разделенный мозг, некоторые запросы на запись — даже для одного и того же пользователя — могут отправляться на разные реплики, не связанные друг с другом. Когда возникает такая ситуация, наша система по-прежнему доступна, но не является согласованной.

Нам нужно решить, что делать с несогласованными записями и данными, когда сеть между двумя разделенными кластерами снова начнет работать.

2. Бесконфликтные реплицированные типы данных в помощь

Давайте рассмотрим два узла, A и B, которые стали отключены из-за разделения мозга.

Допустим, пользователь меняет свой логин и запрос идет к узлу А. Затем он решает изменить его снова, но на этот раз запрос идет к узлу Б.

Из-за разделения- мозг, два узла не связаны. Нам нужно решить, как должен выглядеть логин этого пользователя, когда сеть снова заработает.

Мы можем использовать несколько стратегий: мы можем предоставить возможность разрешения конфликтов пользователю (как это делается в Google Docs), или мы можем использовать CRDT для слияния данных из разрозненных реплик для нас.

3. Зависимость Maven

Во-первых, давайте добавим в библиотеку зависимость, которая предоставляет набор полезных CRDT:

<dependency>
    <groupId>com.netopyr.wurmloch</groupId>
    <artifactId>wurmloch-crdt</artifactId>
    <version>0.1.0</version>
</dependency>

Последнюю версию можно найти на Maven Central.

4. Набор только для роста

Самый простой CRDT — это набор только для роста. Элементы можно только добавлять в GSet и никогда не удалять. Когда GSet расходится, его можно легко объединить, вычислив объединение двух наборов.

Во-первых, давайте создадим две реплики для имитации распределенной структуры данных и соединим эти две реплики с помощью метода connect():

LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);

Получив две реплики в нашем кластере, мы можем создать GSet на первой реплику и сослаться на нее на второй реплике:

GSet<String> replica1 = crdtStore1.createGSet("ID_1");
GSet<String> replica2 = crdtStore2.<String>findGSet("ID_1").get();

На данный момент наш кластер работает как положено, и между двумя репликами есть активное соединение. Мы можем добавить в набор два элемента с двух разных реплик и утверждать, что набор содержит одни и те же элементы на обеих репликах:

replica1.add("apple");
replica2.add("banana");

assertThat(replica1).contains("apple", "banana");
assertThat(replica2).contains("apple", "banana");

Допустим, что вдруг у нас есть сетевой раздел и нет связи между первой и второй реплики. Мы можем смоделировать сетевой раздел, используя метод disconnect():

crdtStore1.disconnect(crdtStore2);

Далее, когда мы добавляем элементы в набор данных из обеих реплик, эти изменения не видны глобально, потому что между ними нет связи:

replica1.add("strawberry");
replica2.add("pear");

assertThat(replica1).contains("apple", "banana", "strawberry");
assertThat(replica2).contains("apple", "banana", "pear");

Как только соединение между обоими элементами кластера снова установлено, GSet объединяется внутри с использованием объединения обоих наборов, и обе реплики снова становятся согласованными:

crdtStore1.connect(crdtStore2);

assertThat(replica1)
  .contains("apple", "banana", "strawberry", "pear");
assertThat(replica2)
  .contains("apple", "banana", "strawberry", "pear");

5. Счетчик только для увеличения

Увеличение- Единственный счетчик — это CRDT, который агрегирует все приращения локально на каждом узле.

Когда реплики синхронизируются, после сетевого раздела результирующее значение вычисляется путем суммирования всех приращений на всех узлах — это похоже на LongAdder из java.concurrent, но на более высоком уровне абстракции.

Давайте создадим инкрементный счетчик с помощью GCounter и будем увеличивать его с обеих реплик. Мы видим, что сумма вычисляется правильно:

LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);

GCounter replica1 = crdtStore1.createGCounter("ID_1");
GCounter replica2 = crdtStore2.findGCounter("ID_1").get();

replica1.increment();
replica2.increment(2L);

assertThat(replica1.get()).isEqualTo(3L);
assertThat(replica2.get()).isEqualTo(3L);

Когда мы отключаем оба элемента кластера и выполняем операции локального увеличения, мы видим, что значения несовместимы:

crdtStore1.disconnect(crdtStore2);

replica1.increment(3L);
replica2.increment(5L);

assertThat(replica1.get()).isEqualTo(6L);
assertThat(replica2.get()).isEqualTo(8L);

Но как только кластер снова становится работоспособным , приращения будут объединены, что даст правильное значение:

crdtStore1.connect(crdtStore2);

assertThat(replica1.get())
  .isEqualTo(11L);
assertThat(replica2.get())
  .isEqualTo(11L);

6. Счетчик PN

Используя аналогичное правило для счетчика только приращений, мы можем создать счетчик, который может как увеличиваться, так и уменьшаться. PNCounter хранит все приращения и уменьшения отдельно.

При синхронизации реплик результирующее значение будет равно сумме всех приращений минус сумма всех уменьшений:

@Test
public void givenPNCounter_whenReplicasDiverge_thenMergesWithoutConflict() {
    LocalCrdtStore crdtStore1 = new LocalCrdtStore();
    LocalCrdtStore crdtStore2 = new LocalCrdtStore();
    crdtStore1.connect(crdtStore2);

    PNCounter replica1 = crdtStore1.createPNCounter("ID_1");
    PNCounter replica2 = crdtStore2.findPNCounter("ID_1").get();

    replica1.increment();
    replica2.decrement(2L);

    assertThat(replica1.get()).isEqualTo(-1L);
    assertThat(replica2.get()).isEqualTo(-1L);

    crdtStore1.disconnect(crdtStore2);

    replica1.decrement(3L);
    replica2.increment(5L);

    assertThat(replica1.get()).isEqualTo(-4L);
    assertThat(replica2.get()).isEqualTo(4L);

    crdtStore1.connect(crdtStore2);

    assertThat(replica1.get()).isEqualTo(1L);
    assertThat(replica2.get()).isEqualTo(1L);
}

«

«7. Реестр «последний писатель выигрывает»

Иногда у нас есть более сложные бизнес-правила, и работы с наборами или счетчиками недостаточно. Мы можем использовать регистр Last-Writer-Wins, который сохраняет только последнее обновленное значение при слиянии расходящихся наборов данных. Кассандра использует эту стратегию для разрешения конфликтов.

Мы должны быть очень осторожны при использовании этой стратегии, потому что она отбрасывает изменения, которые произошли за это время.

LocalCrdtStore crdtStore1 = new LocalCrdtStore("N_1");
LocalCrdtStore crdtStore2 = new LocalCrdtStore("N_2");
crdtStore1.connect(crdtStore2);

LWWRegister<String> replica1 = crdtStore1.createLWWRegister("ID_1");
LWWRegister<String> replica2 = crdtStore2.<String>findLWWRegister("ID_1").get();

replica1.set("apple");
replica2.set("banana");

assertThat(replica1.get()).isEqualTo("banana");
assertThat(replica2.get()).isEqualTo("banana");

Давайте создадим кластер из двух реплик и экземпляров класса LWWRegister:

Когда первая реплика устанавливает значение apple, а вторая меняет его на банан, LWWRegister сохраняет только последнее значение.

crdtStore1.disconnect(crdtStore2);

replica1.set("strawberry");
replica2.set("pear");

assertThat(replica1.get()).isEqualTo("strawberry");
assertThat(replica2.get()).isEqualTo("pear");

Давайте посмотрим, что произойдет, если кластер отключится:

Каждая реплика хранит свою локальную копию несогласованных данных. Когда мы вызываем метод set(), LWWRegister внутренне присваивает специальное значение версии, которое идентифицирует конкретное обновление для каждого использования алгоритма VectorClock.

crdtStore1.connect(crdtStore2);

assertThat(replica1.get()).isEqualTo("pear");
assertThat(replica2.get()).isEqualTo("pear");

При синхронизации кластер принимает значение с самой высокой версией и отбрасывает все предыдущие обновления:

8. Заключение

В этой статье мы показали проблему согласованности распределенных систем при сохранении доступности .

В случае сетевых разделов нам необходимо объединить расходящиеся данные при синхронизации кластера. Мы увидели, как использовать CRDT для объединения разнородных данных.