«1. Обзор

InfluxDB — это высокопроизводительное хранилище данных временных рядов. Он поддерживает вставку и запрос данных в реальном времени с помощью языка запросов, подобного SQL.

В этой вводной статье мы покажем, как подключиться к серверу InfluxDb, создать базу данных, записать информацию о временных рядах, а затем выполнить запрос к базе данных.

2. Настройка

Чтобы подключиться к базе данных, нам нужно добавить запись в наш файл pom.xml:

<dependency>
    <groupId>org.influxdb</groupId>
    <artifactId>influxdb-java</artifactId>
    <version>2.8</version>
</dependency>

Последнюю версию этой зависимости можно найти на Maven Central.

Нам также понадобится экземпляр InfluxDB. Инструкции по загрузке и установке базы данных можно найти на веб-сайте InfluxData.

3. Подключение к серверу

3.1. Создание соединения

Создание соединения с базой данных требует передачи строки URL и учетных данных пользователя в фабрику соединений:

InfluxDB influxDB = InfluxDBFactory.connect(databaseURL, userName, password);

3.2. Проверка соединения

Обмен данными с базой данных осуществляется через RESTful API, поэтому они не являются постоянными.

API предлагает специальную службу проверки связи для подтверждения работоспособности соединения. Если соединение хорошее, ответ содержит версию базы данных. Если нет, он содержит «неизвестно».

Итак, после создания соединения мы можем проверить его, выполнив:

Pong response = this.influxDB.ping();
if (response.getVersion().equalsIgnoreCase("unknown")) {
    log.error("Error pinging server.");
    return;
} 

3.3. Создание базы данных

Создание базы данных InfluxDB аналогично созданию базы данных на большинстве платформ. Но нам нужно создать хотя бы одну политику хранения перед ее использованием.

Политика хранения сообщает базе данных, как долго данные должны храниться. Временные ряды, такие как статистика ЦП или памяти, имеют тенденцию накапливаться в больших наборах данных.

Типичной стратегией управления размером баз данных временных рядов является субдискретизация. «Необработанные» данные сохраняются с высокой скоростью, обобщаются, а затем через короткое время удаляются.

Политики хранения упрощают это, связывая часть данных со сроком действия. У InfluxData есть подробное объяснение на их сайте.

После создания базы данных мы добавим одну политику с именем defaultPolicy. Он просто будет хранить данные в течение 30 дней:

influxDB.createDatabase("baeldung");
influxDB.createRetentionPolicy(
  "defaultPolicy", "baeldung", "30d", 1, true);

Для создания политики хранения нам понадобятся имя, база данных, интервал, коэффициент репликации (который должен быть равен 1 для базы данных с одним экземпляром), и логическое значение, указывающее, что это политика по умолчанию.

3.4. Установка уровня ведения журнала

Внутри InfluxDB API использует Retrofit и предоставляет интерфейс для средства ведения журнала Retrofit через перехватчик ведения журнала.

Итак, мы можем установить уровень ведения журнала, используя:

influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);

И теперь мы можем видеть сообщения, когда мы открываем соединение и пингуем его:

Dec 20, 2017 5:38:10 PM okhttp3.internal.platform.Platform log
INFO: --> GET http://127.0.0.1:8086/ping

Доступные уровни: BASIC, FULL, HEADERS, и НЕТ.

4. Добавление и извлечение данных

4.1. Точки

Итак, теперь мы готовы начать вставку и извлечение данных.

Основной единицей информации в InfluxDB является точка, которая по сути представляет собой временную метку и карту «ключ-значение».

Давайте взглянем на точку, содержащую данные об использовании памяти:

Point point = Point.measurement("memory")
  .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
  .addField("name", "server1")
  .addField("free", 4743656L)
  .addField("used", 1015096L)
  .addField("buffer", 1010467L)
  .build();

Мы создали запись, содержащую три значения Long в качестве статистики памяти, имя хоста и отметку времени.

Давайте посмотрим, как добавить это в базу данных.

4.2. Запись пакетов

Данные временных рядов, как правило, состоят из множества мелких точек, и запись этих записей по одной за раз была бы очень неэффективной. Предпочтительным методом является сбор записей в пакеты.

API InfluxDB предоставляет объект BatchPoint:

BatchPoints batchPoints = BatchPoints
  .database(dbName)
  .retentionPolicy("defaultPolicy")
  .build();

Point point1 = Point.measurement("memory")
  .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
  .addField("name", "server1") 
  .addField("free", 4743656L)
  .addField("used", 1015096L) 
  .addField("buffer", 1010467L)
  .build();

Point point2 = Point.measurement("memory")
  .time(System.currentTimeMillis() - 100, TimeUnit.MILLISECONDS)
  .addField("name", "server1")
  .addField("free", 4743696L)
  .addField("used", 1016096L)
  .addField("buffer", 1008467L)
  .build();

batchPoints.point(point1);
batchPoints.point(point2);
influxDB.write(batchPoints);

Мы создаем BatchPoint, а затем добавляем к нему точки. Мы устанавливаем метку времени для нашей второй записи на 100 миллисекунд в прошлом, поскольку метки времени являются первичным индексом. Если мы отправим две точки с одной и той же меткой времени, останется только одна.

Обратите внимание, что мы должны связать BatchPoints с базой данных и политикой хранения.

4.3. Запись по одному

Пакетная обработка может быть непрактичной для некоторых случаев использования.

Давайте включим пакетный режим одним вызовом соединения с InfluxDB:

influxDB.enableBatch(100, 200, TimeUnit.MILLISECONDS);

Мы включили пакетную обработку 100 для вставки на сервер или отправки того, что есть, каждые 200 миллисекунд.

С включенным пакетным режимом мы все еще можем писать по одному. Однако требуется дополнительная настройка:

influxDB.setRetentionPolicy("defaultPolicy");
influxDB.setDatabase(dbName);

«

influxDB.write(point);

«Более того, теперь мы можем записывать отдельные точки, и они собираются пакетами фоновым потоком: политика хранения по умолчанию. Поэтому, если мы хотим воспользоваться преимуществами даунсэмплинга с несколькими политиками хранения, создание пакетов — это то, что нужно.

Пакетный режим использует отдельный пул потоков. Поэтому рекомендуется отключать его, когда он больше не нужен:

influxDB.disableBatch();

Закрытие соединения также отключит пул потоков:

influxDB.close();

4.4. Сопоставление результатов запроса

Запросы возвращают QueryResult, который мы можем сопоставить с POJO.

Прежде чем мы рассмотрим синтаксис запроса, давайте создадим класс для хранения нашей статистики памяти:

@Measurement(name = "memory")
public class MemoryPoint {

    @Column(name = "time")
    private Instant time;

    @Column(name = "name")
    private String name;

    @Column(name = "free")
    private Long free;

    @Column(name = "used")
    private Long used;

    @Column(name = "buffer")
    private Long buffer;
}

Класс снабжен аннотацией @Measurement(name = «memory»), соответствующей Point.measurement( «память»), которые мы использовали для создания наших точек.

Для каждого поля в нашем QueryResult мы добавляем аннотацию @Column(name = «XXX») с именем соответствующего поля.

QueryResults сопоставляются с POJO с помощью InfluxDBResultMapper.

4.5. Запрос InfluxDB

Итак, давайте используем наш POJO с точками, которые мы добавили в базу данных в нашем пакете из двух точек:

QueryResult queryResult = connection
  .performQuery("Select * from memory", "baeldung");

InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<MemoryPoint> memoryPointList = resultMapper
  .toPOJO(queryResult, MemoryPoint.class);

assertEquals(2, memoryPointList.size());
assertTrue(4743696L == memoryPointList.get(0).getFree());

Запрос показывает, как наше измерение с именем memory хранится в виде таблицы точек, которую мы можем выбрать. от.

InfluxDBResultMapper принимает ссылку на MemoryPoint.class с QueryResult и возвращает список точек.

После сопоставления результатов мы убеждаемся, что получили два, проверяя длину списка, полученного из запроса. Затем мы смотрим на первую запись в списке и видим размер свободной памяти второй точки, которую мы вставили. По умолчанию результаты запросов из InfluxDB упорядочиваются по возрастанию по отметке времени.

Давайте изменим это:

queryResult = connection.performQuery(
  "Select * from memory order by time desc", "baeldung");
memoryPointList = resultMapper
  .toPOJO(queryResult, MemoryPoint.class);

assertEquals(2, memoryPointList.size());
assertTrue(4743656L == memoryPointList.get(0).getFree());

Добавление порядка по времени описания меняет порядок наших результатов на противоположный.

Запросы InfluxDB очень похожи на SQL. На их сайте есть обширный справочник.

5. Заключение

Мы подключились к серверу InfluxDB, создали базу данных с политикой хранения, а затем вставили и получили данные с сервера.

Полный исходный код примеров находится на GitHub.