«1. Обзор
В этом руководстве мы кратко рассмотрим Big Queue, Java-реализацию постоянной очереди.
Мы немного поговорим о его архитектуре, а затем научимся использовать его на быстрых и практических примерах.
2. Использование
Нам нужно добавить зависимость bigqueue в наш проект:
<dependency>
<groupId>com.leansoft</groupId>
<artifactId>bigqueue</artifactId>
<version>0.7.0</version>
</dependency>
Нам также нужно добавить его репозиторий:
<repository>
<id>github.release.repo</id>
<url>https://raw.github.com/bulldog2011/bulldog-repo/master/repo/releases/</url>
</repository>
Если мы привыкли работать с базовые очереди, адаптироваться к Big Queue не составит труда, так как его API очень похож.
2.1. Инициализация
Мы можем инициализировать нашу очередь, просто вызвав ее конструктор:
@Before
public void setup() {
String queueDir = System.getProperty("user.home");
String queueName = "baeldung-queue";
bigQueue = new BigQueueImpl(queueDir, queueName);
}
Первый аргумент — это домашний каталог для нашей очереди.
Второй аргумент представляет имя нашей очереди. Он создаст папку в домашнем каталоге нашей очереди, где мы сможем сохранять данные.
Мы должны не забыть закрыть нашу очередь, когда закончим, чтобы предотвратить утечку памяти:
bigQueue.close();
2.2. Вставка
Мы можем добавить элементы в хвост, просто вызвав метод enqueue:
@Test
public void whenAddingRecords_ThenTheSizeIsCorrect() {
for (int i = 1; i <= 100; i++) {
bigQueue.enqueue(String.valueOf(i).getBytes());
}
assertEquals(100, bigQueue.size());
}
Следует отметить, что Big Queue поддерживает только тип данных byte[], поэтому мы несем ответственность за сериализацию наших записей при вставке.
2.3. Чтение
Как мы и ожидали, чтение данных с помощью метода удаления из очереди выполняется так же просто:
@Test
public void whenAddingRecords_ThenTheyCanBeRetrieved() {
bigQueue.enqueue(String.valueOf("new_record").getBytes());
String record = new String(bigQueue.dequeue());
assertEquals("new_record", record);
}
Мы также должны соблюдать осторожность, чтобы правильно десериализовать наши данные при чтении.
Чтение из пустой очереди вызывает исключение NullPointerException.
Мы должны убедиться, что в нашей очереди есть значения, используя метод isEmpty:
if(!bigQueue.isEmpty()){
// read
}
Чтобы очистить нашу очередь, не просматривая каждую запись, мы можем использовать метод removeAll:
bigQueue.removeAll();
2.4 . Просмотр
При просмотре мы просто читаем запись, не потребляя ее:
@Test
public void whenPeekingRecords_ThenSizeDoesntChange() {
for (int i = 1; i <= 100; i++) {
bigQueue.enqueue(String.valueOf(i).getBytes());
}
String firstRecord = new String(bigQueue.peek());
assertEquals("1", firstRecord);
assertEquals(100, bigQueue.size());
}
2.5. Удаление потребляемых записей
Когда мы вызываем метод dequeue, записи удаляются из нашей очереди, но остаются на диске.
Это потенциально может заполнить наш диск ненужными данными.
К счастью, мы можем удалить использованные записи с помощью метода gc:
bigQueue.gc();
Точно так же, как сборщик мусора в Java очищает объекты, на которые нет ссылок, из кучи, gc очищает использованные записи с нашего диска.
3. Архитектура и функции
Что интересно в Big Queue, так это то, что его кодовая база чрезвычайно мала — всего 12 исходных файлов, занимающих около 20 КБ дискового пространства.
На высоком уровне это просто постоянная очередь, которая отлично справляется с обработкой больших объемов данных.
3.1. Обработка больших объемов данных
Размер очереди ограничен только общим доступным дисковым пространством. Каждая запись в нашей очереди сохраняется на диске, чтобы быть устойчивой к сбоям.
Нашим узким местом будет дисковый ввод-вывод, а это означает, что твердотельный накопитель значительно улучшит среднюю пропускную способность по сравнению с жестким диском.
3.2. Чрезвычайно быстрый доступ к данным
Если мы посмотрим на его исходный код, то заметим, что очередь поддерживается файлом с отображением памяти. Доступная часть нашей очереди (голова) хранится в оперативной памяти, поэтому доступ к записям будет чрезвычайно быстрым.
Даже если наша очередь станет очень большой и будет занимать терабайты дискового пространства, мы все равно сможем считывать данные с временной сложностью O(1).
Если нам нужно читать много сообщений, а скорость является критической проблемой, мы должны рассмотреть возможность использования SSD вместо жесткого диска, так как перемещение данных с диска в память будет намного быстрее.
3.3. Преимущества
Большим преимуществом является его способность достигать очень больших размеров. Мы можем масштабировать его до теоретической бесконечности, просто добавив больше памяти, отсюда и его название «Большой».
В параллельной среде Big Queue может производить и потреблять около 166 МБ/с данных на обычном компьютере.
Если средний размер нашего сообщения составляет 1 КБ, он может обрабатывать 166 тыс. сообщений в секунду.
Он может обрабатывать до 333 тыс. сообщений в секунду в однопоточной среде — довольно впечатляюще!
3.4. Недостатки
Наши сообщения остаются на диске даже после того, как мы их израсходовали, поэтому нам приходится заботиться о сборе мусора, когда он нам больше не нужен.
Мы также отвечаем за сериализацию и десериализацию наших сообщений.
4. Заключение
«В этом кратком руководстве мы узнали о большой очереди и о том, как мы можем использовать ее в качестве масштабируемой и постоянной очереди.
Как всегда, код доступен на Github.