«1. Обзор

В этом руководстве мы узнаем, как реализовать кольцевой буфер в Java.

2. Кольцевой буфер

Кольцевой буфер (или циклический буфер) — это ограниченная кольцевая структура данных, которая используется для буферизации данных между двумя или более потоками. Поскольку мы продолжаем писать в кольцевой буфер, он циклически повторяется по мере достижения конца.

2.1. Как это работает

Кольцевой буфер реализован с использованием массива фиксированного размера, который охватывает границы.

Помимо массива, он отслеживает три вещи:

    следующий доступный слот в буфере для вставки элемента, следующий непрочитанный элемент в буфере и конец массива – точку в которые буфер переносит в начало массива

Механика того, как кольцевой буфер обрабатывает эти требования, зависит от реализации. Например, запись в Википедии по этому вопросу показывает метод, использующий четыре указателя.

Мы позаимствуем подход из реализации кольцевого буфера Disruptor с использованием последовательностей.

Первое, что нам нужно знать, это емкость — фиксированный максимальный размер буфера. Далее мы будем использовать две монотонно возрастающие последовательности:

    Последовательность записи: начиная с -1, увеличивается на 1 при вставке элемента Последовательность чтения: начинается с 0, увеличивается на 1 при потреблении элемента

Мы можем сопоставьте последовательность с индексом в массиве с помощью операции mod:

arrayIndex = sequence % capacity

Операция mod оборачивает последовательность вокруг границ, чтобы получить слот в буфере:

Давайте посмотрим, как мы будем вставлять элемент :

buffer[++writeSequence % capacity] = element

Мы предварительно увеличиваем последовательность перед вставкой элемента.

Чтобы использовать элемент, мы выполняем постинкремент:

element = buffer[readSequence++ % capacity]

В этом случае мы выполняем постинкремент последовательности. Использование элемента не удаляет его из буфера — он просто остается в массиве до тех пор, пока не будет перезаписан.

2.2. Пустые и полные буферы

Когда мы обходим массив, мы начинаем перезаписывать данные в буфере. Если буфер заполнен, мы можем либо перезаписать самые старые данные независимо от того, использовал ли их считыватель, либо предотвратить перезапись данных, которые не были прочитаны.

Если читатель может позволить себе пропустить промежуточные или старые значения (например, бегущую строку курса акций), мы можем перезаписать данные, не дожидаясь их использования. С другой стороны, если считыватель должен использовать все значения (как в случае с транзакциями электронной коммерции), мы должны ждать (ожидание блокировки/ожидания занятости), пока в буфере не появится свободный слот.

Буфер заполнен, если размер буфера равен его емкости, где его размер равен количеству непрочитанных элементов:

size = (writeSequence - readSequence) + 1
isFull = (size == capacity)

Если последовательность записи отстает от последовательности чтения, буфер empty:

isEmpty = writeSequence < readSequence

Буфер возвращает нулевое значение, если он пуст.

2.2. Преимущества и недостатки

Кольцевой буфер является эффективным буфером FIFO. Он использует массив фиксированного размера, который может быть предварительно выделен заранее, и обеспечивает эффективный шаблон доступа к памяти. Все операции с буфером выполняются за постоянное время O(1), включая потребление элемента, так как это не требует смещения элементов.

С другой стороны, определение правильного размера кольцевого буфера имеет решающее значение. Например, операции записи могут блокироваться на долгое время, если размер буфера недостаточен, а операции чтения выполняются медленно. Мы можем использовать динамическое изменение размера, но это потребует перемещения данных, и мы упустим большинство преимуществ, описанных выше.

3. Реализация на Java

Теперь, когда мы поняли, как работает кольцевой буфер, давайте приступим к его реализации на Java.

3.1. Инициализация

Во-первых, давайте определим конструктор, который инициализирует буфер с предопределенной емкостью:

public CircularBuffer(int capacity) {
    this.capacity = (capacity < 1) ? DEFAULT_CAPACITY : capacity;
    this.data = (E[]) new Object[this.capacity];
    this.readSequence = 0;
    this.writeSequence = -1;
}

Это создаст пустой буфер и инициализирует поля последовательности, как обсуждалось в предыдущем разделе.

3.2. Предложение

Далее мы реализуем операцию предложения, которая вставляет элемент в буфер в следующем доступном слоте и возвращает значение true в случае успеха. Возвращает false, если буфер не может найти пустой слот, то есть мы не можем перезаписать непрочитанные значения.

«Давайте реализуем метод offer на Java:

public boolean offer(E element) {
    boolean isFull = (writeSequence - readSequence) + 1 == capacity;
    if (!isFull) {
        int nextWriteSeq = writeSequence + 1;
        data[nextWriteSeq % capacity] = element;
        writeSequence++;
        return true;
    }
    return false;
}

Итак, мы увеличиваем последовательность записи и вычисляем индекс в массиве для следующего доступного слота. Затем мы записываем данные в буфер и сохраняем обновленную последовательность записи.

Давайте попробуем:

@Test
public void givenCircularBuffer_whenAnElementIsEnqueued_thenSizeIsOne() {
    CircularBuffer buffer = new CircularBuffer<>(defaultCapacity);

    assertTrue(buffer.offer("Square"));
    assertEquals(1, buffer.size());
}

3.3. Опрос

Наконец, мы реализуем операцию опроса, которая извлекает и удаляет следующий непрочитанный элемент. Операция опроса не удаляет элемент, а увеличивает последовательность чтения.

Давайте реализуем это:

public E poll() {
    boolean isEmpty = writeSequence < readSequence;
    if (!isEmpty) {
        E nextValue = data[readSequence % capacity];
        readSequence++;
        return nextValue;
    }
    return null;
}

Здесь мы читаем данные в текущей последовательности чтения, вычисляя индекс в массиве. Затем мы увеличиваем последовательность и возвращаем значение, если буфер не пуст.

Давайте проверим это:

@Test
public void givenCircularBuffer_whenAnElementIsDequeued_thenElementMatchesEnqueuedElement() {
    CircularBuffer buffer = new CircularBuffer<>(defaultCapacity);
    buffer.offer("Triangle");
    String shape = buffer.poll();

    assertEquals("Triangle", shape);
}

4. Проблема производитель-потребитель

Мы говорили об использовании кольцевого буфера для обмена данными между двумя или более потоками, что является примером синхронизации проблема, называемая проблемой производителя-потребителя. В Java мы можем решить проблему производитель-потребитель различными способами, используя семафоры, ограниченные очереди, кольцевые буферы и т. д.

Давайте реализуем решение на основе кольцевого буфера.

4.1. volatile Поля последовательности

Наша реализация кольцевого буфера не является потокобезопасной. Давайте сделаем его потокобезопасным для простого случая с одним производителем и одним потребителем.

Производитель записывает данные в буфер и увеличивает writeSequence, а потребитель только читает из буфера и увеличивает readSequence. Таким образом, резервный массив не конфликтует, и мы можем обойтись без синхронизации.

Но нам по-прежнему нужно убедиться, что потребитель может видеть последнее значение поля writeSequence (видимость) и что writeSequence не обновляется до того, как данные действительно будут доступны в буфере (упорядочение).

В этом случае мы можем сделать кольцевой буфер параллельным и свободным от блокировок, сделав поля последовательности изменчивыми:

private volatile int writeSequence = -1, readSequence = 0;

В методе offer запись в изменчивое поле writeSequence гарантирует, что запись в буфер произойдет перед обновлением последовательности. В то же время гарантия изменчивой видимости гарантирует, что потребитель всегда будет видеть самое последнее значение writeSequence.

4.2. Производитель

Давайте реализуем простой Runnable-производитель, который записывает в кольцевой буфер:

public void run() {
    for (int i = 0; i < items.length;) {
        if (buffer.offer(items[i])) {
           System.out.println("Produced: " + items[i]);
            i++;
        }
    }
}

Поток-производитель будет ожидать появления пустого слота в цикле (занято-ожидание).

4.3. Потребитель

Мы реализуем вызываемый объект-потребитель, который читает из буфера:

public T[] call() {
    T[] items = (T[]) new Object[expectedCount];
    for (int i = 0; i < items.length;) {
        T item = buffer.poll();
        if (item != null) {
            items[i++] = item;
            System.out.println("Consumed: " + item);
        }
    }
    return items;
}

Поток-потребитель продолжает работу без печати, если он получает нулевое значение из буфера.

Давайте напишем наш код драйвера:

executorService.submit(new Thread(new Producer<String>(buffer)));
executorService.submit(new Thread(new Consumer<String>(buffer)));

Выполнение нашей программы производитель-потребитель выдает результат, как показано ниже:

Produced: Circle
Produced: Triangle
  Consumed: Circle
Produced: Rectangle
  Consumed: Triangle
  Consumed: Rectangle
Produced: Square
Produced: Rhombus
  Consumed: Square
Produced: Trapezoid
  Consumed: Rhombus
  Consumed: Trapezoid
Produced: Pentagon
Produced: Pentagram
Produced: Hexagon
  Consumed: Pentagon
  Consumed: Pentagram
Produced: Hexagram
  Consumed: Hexagon
  Consumed: Hexagram

5. Заключение

В этом руководстве мы узнали, как реализовать Ring Buffer и исследовал, как его можно использовать для решения проблемы производитель-потребитель.

Как обычно, исходный код всех примеров доступен на GitHub.