«1. Обзор

В этом руководстве мы кратко рассмотрим Big Queue, Java-реализацию постоянной очереди.

Мы немного поговорим о его архитектуре, а затем научимся использовать его на быстрых и практических примерах.

2. Использование

Нам нужно добавить зависимость bigqueue в наш проект:

<dependency>
    <groupId>com.leansoft</groupId>
    <artifactId>bigqueue</artifactId>
    <version>0.7.0</version>
</dependency>

Нам также нужно добавить его репозиторий:

<repository>
    <id>github.release.repo</id>
    <url>https://raw.github.com/bulldog2011/bulldog-repo/master/repo/releases/</url>
</repository>

Если мы привыкли работать с базовые очереди, адаптироваться к Big Queue не составит труда, так как его API очень похож.

2.1. Инициализация

Мы можем инициализировать нашу очередь, просто вызвав ее конструктор:

@Before
public void setup() {
    String queueDir = System.getProperty("user.home");
    String queueName = "baeldung-queue";
    bigQueue = new BigQueueImpl(queueDir, queueName);
}

Первый аргумент — это домашний каталог для нашей очереди.

Второй аргумент представляет имя нашей очереди. Он создаст папку в домашнем каталоге нашей очереди, где мы сможем сохранять данные.

Мы должны не забыть закрыть нашу очередь, когда закончим, чтобы предотвратить утечку памяти:

bigQueue.close();

2.2. Вставка

Мы можем добавить элементы в хвост, просто вызвав метод enqueue:

@Test
public void whenAddingRecords_ThenTheSizeIsCorrect() {
    for (int i = 1; i <= 100; i++) {
        bigQueue.enqueue(String.valueOf(i).getBytes());
    }
 
    assertEquals(100, bigQueue.size());
}

Следует отметить, что Big Queue поддерживает только тип данных byte[], поэтому мы несем ответственность за сериализацию наших записей при вставке.

2.3. Чтение

Как мы и ожидали, чтение данных с помощью метода удаления из очереди выполняется так же просто:

@Test
public void whenAddingRecords_ThenTheyCanBeRetrieved() {
    bigQueue.enqueue(String.valueOf("new_record").getBytes());

    String record = new String(bigQueue.dequeue());
 
    assertEquals("new_record", record);
}

Мы также должны соблюдать осторожность, чтобы правильно десериализовать наши данные при чтении.

Чтение из пустой очереди вызывает исключение NullPointerException.

Мы должны убедиться, что в нашей очереди есть значения, используя метод isEmpty:

if(!bigQueue.isEmpty()){
    // read
}

Чтобы очистить нашу очередь, не просматривая каждую запись, мы можем использовать метод removeAll:

bigQueue.removeAll();

2.4 . Просмотр

При просмотре мы просто читаем запись, не потребляя ее:

@Test
public void whenPeekingRecords_ThenSizeDoesntChange() {
    for (int i = 1; i <= 100; i++) {
        bigQueue.enqueue(String.valueOf(i).getBytes());
    }
 
    String firstRecord = new String(bigQueue.peek());

    assertEquals("1", firstRecord);
    assertEquals(100, bigQueue.size());
}

2.5. Удаление потребляемых записей

Когда мы вызываем метод dequeue, записи удаляются из нашей очереди, но остаются на диске.

Это потенциально может заполнить наш диск ненужными данными.

К счастью, мы можем удалить использованные записи с помощью метода gc:

bigQueue.gc();

Точно так же, как сборщик мусора в Java очищает объекты, на которые нет ссылок, из кучи, gc очищает использованные записи с нашего диска.

3. Архитектура и функции

Что интересно в Big Queue, так это то, что его кодовая база чрезвычайно мала — всего 12 исходных файлов, занимающих около 20 КБ дискового пространства.

На высоком уровне это просто постоянная очередь, которая отлично справляется с обработкой больших объемов данных.

3.1. Обработка больших объемов данных

Размер очереди ограничен только общим доступным дисковым пространством. Каждая запись в нашей очереди сохраняется на диске, чтобы быть устойчивой к сбоям.

Нашим узким местом будет дисковый ввод-вывод, а это означает, что твердотельный накопитель значительно улучшит среднюю пропускную способность по сравнению с жестким диском.

3.2. Чрезвычайно быстрый доступ к данным

Если мы посмотрим на его исходный код, то заметим, что очередь поддерживается файлом с отображением памяти. Доступная часть нашей очереди (голова) хранится в оперативной памяти, поэтому доступ к записям будет чрезвычайно быстрым.

Даже если наша очередь станет очень большой и будет занимать терабайты дискового пространства, мы все равно сможем считывать данные с временной сложностью O(1).

Если нам нужно читать много сообщений, а скорость является критической проблемой, мы должны рассмотреть возможность использования SSD вместо жесткого диска, так как перемещение данных с диска в память будет намного быстрее.

3.3. Преимущества

Большим преимуществом является его способность достигать очень больших размеров. Мы можем масштабировать его до теоретической бесконечности, просто добавив больше памяти, отсюда и его название «Большой».

В параллельной среде Big Queue может производить и потреблять около 166 МБ/с данных на обычном компьютере.

Если средний размер нашего сообщения составляет 1 КБ, он может обрабатывать 166 тыс. сообщений в секунду.

Он может обрабатывать до 333 тыс. сообщений в секунду в однопоточной среде — довольно впечатляюще!

3.4. Недостатки

Наши сообщения остаются на диске даже после того, как мы их израсходовали, поэтому нам приходится заботиться о сборе мусора, когда он нам больше не нужен.

Мы также отвечаем за сериализацию и десериализацию наших сообщений.

4. Заключение

«В этом кратком руководстве мы узнали о большой очереди и о том, как мы можем использовать ее в качестве масштабируемой и постоянной очереди.

Как всегда, код доступен на Github.