«1. Обзор

В этом руководстве мы покажем, как мы можем использовать R2DBC для выполнения операций с базой данных реактивным способом.

Чтобы изучить R2DBC, мы создадим простое приложение Spring WebFlux REST, которое реализует операции CRUD для одного объекта, используя для достижения этой цели только асинхронные операции.

2. Что такое R2DBC?

Реактивная разработка находится на подъеме: новые фреймворки появляются каждый день, а существующие получают все большее распространение. Однако основной проблемой реактивной разработки является тот факт, что доступ к базе данных в мире Java/JVM остается в основном синхронным. Это прямое следствие того, как был разработан JDBC, и привело к некоторым уродливым хакам для адаптации этих двух принципиально разных подходов.

Чтобы удовлетворить потребность в асинхронном доступе к базе данных в мире Java, появились два стандарта. Первый из них, ADBC ​​(API для асинхронного доступа к базе данных), поддерживается Oracle, но на момент написания этой статьи кажется, что он несколько застопорился и не имеет четкой временной шкалы.

Второй, который мы рассмотрим здесь, — это R2DBC (Reactive Relational Database Connectivity), работа сообщества, возглавляемая командой из Pivotal и других компаний. Этот проект, который все еще находится в стадии бета-тестирования, продемонстрировал большую жизнеспособность и уже предоставляет драйверы для баз данных Postgres, H2 и MSSQL.

3. Настройка проекта

Использование R2DBC в проекте требует добавления зависимостей к основному API и подходящего драйвера. В нашем примере мы будем использовать H2, поэтому это означает только две зависимости:

<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-spi</artifactId>
    <version>0.8.0.M7</version>
</dependency>
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-h2</artifactId>
    <version>0.8.0.M7</version>
</dependency>

Maven Central пока еще не имеет артефактов R2DBC, поэтому нам также нужно добавить в наш проект пару репозиториев Spring: ~ ~~

<repositories>
    <repository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/milestone</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
   </repository>
   <repository>
       <id>spring-snapshots</id>
       <name>Spring Snapshots</name>
       <url>https://repo.spring.io/snapshot</url>
       <snapshots>
           <enabled>true</enabled>
       </snapshots>
    </repository>
</repositories>

4. Настройка фабрики соединений

Первое, что нам нужно сделать, чтобы получить доступ к базе данных с помощью R2DBC, — это создать объект ConnectionFactory, который играет роль, аналогичную DataSource JDBC. Самый простой способ создать ConnectionFactory — использовать класс ConnectionFactory.

Этот класс имеет статические методы, которые принимают объект ConnectionFactoryOptions и возвращают ConnectionFactory. Так как нам понадобится только один экземпляр нашей ConnectionFactory, давайте создадим @Bean, который мы сможем позже использовать через инъекцию везде, где нам нужно:

@Bean
public ConnectionFactory connectionFactory(R2DBCConfigurationProperties properties) {
    ConnectionFactoryOptions baseOptions = ConnectionFactoryOptions.parse(properties.getUrl());
    Builder ob = ConnectionFactoryOptions.builder().from(baseOptions);
    if (!StringUtil.isNullOrEmpty(properties.getUser())) {
        ob = ob.option(USER, properties.getUser());
    }
    if (!StringUtil.isNullOrEmpty(properties.getPassword())) {
        ob = ob.option(PASSWORD, properties.getPassword());
    }        
    return ConnectionFactories.get(ob.build());    
}

Здесь мы берем параметры, полученные от вспомогательного класса, украшенного @ConfigurationProperties аннотацию и заполнить наш экземпляр ConnectionFactoryOptions. Чтобы заполнить его, R2DBC реализует шаблон построителя с одним методом option, который принимает Option и значение.

R2DBC определяет ряд хорошо известных опций, таких как USERNAME и PASSWORD, которые мы использовали выше. Другой способ установить эти параметры — передать строку соединения методу parse() класса ConnectionFactoryOptions.

Вот пример типичного URL-адреса подключения R2DBC:

r2dbc:h2:mem://./testdb

Давайте разобьем эту строку на компоненты:

    r2dbc: идентификатор фиксированной схемы для URL-адресов R2DBC. SSL-защищенные соединения h2: Идентификатор драйвера, используемый для поиска соответствующей фабрики соединений. mem: Протокол, специфичный для драйвера — в нашем случае это соответствует базе данных в памяти. , базу данных и любые дополнительные параметры.

Как только у нас есть готовый набор опций, мы передаем его статическому методу фабрики get() для создания нашего bean-компонента ConnectionFactory.

5. Выполнение операторов

Подобно JDBC, использование R2DBC в основном связано с отправкой операторов SQL в базу данных и обработкой наборов результатов. Однако, поскольку R2DBC является реактивным API, он сильно зависит от типов реактивных потоков, таких как издатель и подписчик.

Использование этих типов напрямую немного громоздко, поэтому мы будем использовать типы реактора проекта, такие как Mono и Flux, которые помогут нам писать более чистый и лаконичный код.

В следующих разделах мы увидим, как реализовать задачи, связанные с базой данных, путем создания реактивного класса DAO для простого класса Account. Этот класс содержит всего три свойства и имеет соответствующую таблицу в нашей базе данных:

public class Account {
    private Long id;
    private String iban;
    private BigDecimal balance;
    // ... getters and setters omitted
}

5.1. Получение соединения

«Прежде чем мы сможем отправлять какие-либо операторы в базу данных, нам нужен экземпляр Connection. Мы уже видели, как создать ConnectionFactory, поэтому неудивительно, что мы будем использовать его для получения Connection. Что мы должны помнить, так это то, что теперь вместо обычного Connection мы получаем Publisher одного Connection.

Наш ReactiveAccountDao, который является обычным Spring @Component, получает свою ConnectionFactory через внедрение конструктора, поэтому он легко доступен в методах обработчика.

Давайте посмотрим на первые пару строк метода findById(), чтобы увидеть, как получить и начать использовать Connection:

public Mono<Account>> findById(Long id) {         
    return Mono.from(connectionFactory.create())
      .flatMap(c ->
          // use the connection
      )
      // ... downstream processing omitted
}

Здесь мы адаптируем Publisher, возвращенный из нашей ConnectionFactory, в Mono, который является исходным источником для нашего потока событий.

5.1. Подготовка и отправка операторов

Теперь, когда у нас есть соединение, давайте воспользуемся им для создания оператора и привяжем к нему параметр:

.flatMap( c -> 
    Mono.from(c.createStatement("select id,iban,balance from Account where id = $1")
      .bind("$1", id)
      .execute())
      .doFinally((st) -> close(c))
 )

Метод createStatement соединения принимает строку запроса SQL, которая может иметь заполнители для привязки — называемые в спецификации «маркерами».

Пара замечаний: во-первых, createStatement — это синхронная операция, которая позволяет нам использовать плавный стиль для привязки значений к возвращаемому оператору; во-вторых, и это очень важно, синтаксис заполнителя/маркера зависит от поставщика!

В этом примере мы используем особый синтаксис H2, который использует $n для обозначения параметров. Другие поставщики могут использовать другой синтаксис, например :param, @Pn или другое соглашение. Это важный аспект, на который мы должны обратить внимание при переносе устаревшего кода на этот новый API.

Сам процесс связывания довольно прост из-за плавного шаблона API и упрощенного ввода: есть только один перегруженный метод bind(), который заботится обо всех преобразованиях ввода — конечно, в соответствии с правилами базы данных.

Первый параметр, передаваемый в bind(), может быть порядковым номером, начинающимся с нуля, который соответствует положению маркера в операторе, или может быть строкой с фактическим маркером.

После того, как мы установили значения для всех параметров, мы вызываем execute(), который возвращает Publisher объектов Result, которые мы снова переносим в Mono для дальнейшей обработки. Мы присоединяем обработчик doFinally() к этому Mono, чтобы убедиться, что мы закроем наше соединение, независимо от того, завершится ли обработка потока нормально или нет.

5.2. Обработка результатов

Следующий шаг в нашем конвейере отвечает за обработку объектов Result и создание потока экземпляров ResponseEntity\u003cAccount\u003e.

Поскольку мы знаем, что может быть только один экземпляр с заданным идентификатором, мы фактически вернем поток Mono. Фактическое преобразование происходит внутри функции, переданной методу map() полученного результата:

.map(result -> result.map((row, meta) -> 
    new Account(row.get("id", Long.class),
      row.get("iban", String.class),
      row.get("balance", BigDecimal.class))))
.flatMap(p -> Mono.from(p));

Метод результата map() ожидает функцию, которая принимает два параметра. Первый — это объект Row, который мы используем для сбора значений для каждого столбца и заполнения экземпляра Account. Второй, meta, представляет собой объект RowMetadata, который содержит информацию о текущей строке, такую ​​как имена и типы столбцов.

Предыдущий вызов map() в нашем конвейере разрешается в Mono\u003cProducer\u003cAccount\u003e\u003e, но нам нужно вернуть Mono\u003cAccount\u003e из этого метода. Чтобы исправить это, мы добавляем последний шаг flatMap(), который адаптирует Producer к Mono.

5.3. Пакетные операторы

R2DBC также поддерживает создание и выполнение пакетов операторов, что позволяет выполнять несколько операторов SQL в одном вызове execute(). В отличие от обычных операторов пакетные операторы не поддерживают привязку и в основном используются для повышения производительности в таких сценариях, как задания ETL.

В нашем примере проекта используется пакет операторов для создания таблицы Account и вставки в нее некоторых тестовых данных:

@Bean
public CommandLineRunner initDatabase(ConnectionFactory cf) {
    return (args) ->
      Flux.from(cf.create())
        .flatMap(c -> 
            Flux.from(c.createBatch()
              .add("drop table if exists Account")
              .add("create table Account(" +
                "id IDENTITY(1,1)," +
                "iban varchar(80) not null," +
                "balance DECIMAL(18,2) not null)")
              .add("insert into Account(iban,balance)" +
                "values('BR430120980198201982',100.00)")
              .add("insert into Account(iban,balance)" +
                "values('BR430120998729871000',250.00)")
              .execute())
            .doFinally((st) -> c.close())
          )
        .log()
        .blockLast();
}

Здесь мы используем пакет, возвращенный функцией createBatch(), и добавляем несколько операторов SQL. Затем мы отправляем эти инструкции для выполнения, используя тот же метод execute(), доступный в интерфейсе Statement.

«В данном конкретном случае нас не интересуют какие-либо результаты — просто все операторы выполняются нормально. Если бы нам понадобились какие-либо полученные результаты, все, что нам нужно было сделать, это добавить в этот поток следующий шаг для обработки сгенерированных объектов Result.

6. Транзакции

Последняя тема, которую мы рассмотрим в этом руководстве, — это транзакции. Как и следовало ожидать, мы управляем транзакциями так же, как в JDBC, то есть с помощью методов, доступных в объекте Connection.

Как и раньше, основное отличие состоит в том, что теперь все методы, связанные с транзакциями, являются асинхронными, возвращая Publisher, который мы должны добавить в наш поток в соответствующих точках.

Наш пример проекта использует транзакцию в реализации метода createAccount():

public Mono<Account> createAccount(Account account) {    
    return Mono.from(connectionFactory.create())
      .flatMap(c -> Mono.from(c.beginTransaction())
        .then(Mono.from(c.createStatement("insert into Account(iban,balance) values($1,$2)")
          .bind("$1", account.getIban())
          .bind("$2", account.getBalance())
          .returnGeneratedValues("id")
          .execute()))
        .map(result -> result.map((row, meta) -> 
            new Account(row.get("id", Long.class),
              account.getIban(),
              account.getBalance())))
        .flatMap(pub -> Mono.from(pub))
        .delayUntil(r -> c.commitTransaction())
        .doFinally((st) -> c.close()));   
}

Здесь мы добавили вызовы, связанные с транзакциями, в двух точках. Во-первых, сразу после получения нового соединения из базы данных мы вызываем метод beginTransactionMethod(). Как только мы узнаем, что транзакция была успешно запущена, мы подготавливаем и выполняем оператор вставки.

На этот раз мы также использовали метод returnGeneratedValues(), чтобы указать базе данных вернуть значение идентификатора, сгенерированное для этого нового Пользователя. R2DBC возвращает эти значения в результате, содержащем одну строку со всеми сгенерированными значениями, которые мы используем для создания экземпляра учетной записи.

Опять же, нам нужно преобразовать входящий Mono\u003cPublisher\u003cAccount\u003e\u003e в Mono\u003cAccount\u003e, поэтому для решения этой проблемы мы добавляем flatMap(). Затем мы фиксируем транзакцию на шаге delayUntil(). Нам это нужно, потому что мы хотим убедиться, что возвращенный Аккаунт уже зафиксирован в базе данных.

Наконец, мы присоединяем к этому пайплайну шаг doFinally, который закрывает Connection, когда все события из возвращенного Mono исчерпаны.

7. Пример использования DAO

Теперь, когда у нас есть реактивный DAO, давайте воспользуемся им для создания простого приложения Spring WebFlux, чтобы продемонстрировать, как его использовать в типичном приложении. Поскольку этот фреймворк уже поддерживает реактивные конструкции, это становится тривиальной задачей. Например, давайте взглянем на реализацию метода GET:

@RestController
public class AccountResource {
    private final ReactiveAccountDao accountDao;

    public AccountResource(ReactiveAccountDao accountDao) {
        this.accountDao = accountDao;
    }

    @GetMapping("/accounts/{id}")
    public Mono<ResponseEntity<Account>> getAccount(@PathVariable("id") Long id) {
        return accountDao.findById(id)
          .map(acc -> new ResponseEntity<>(acc, HttpStatus.OK))
          .switchIfEmpty(Mono.just(new ResponseEntity<>(null, HttpStatus.NOT_FOUND)));
    }
    // ... other methods omitted
}

Здесь мы используем возвращенный нашим DAO Mono для создания ResponseEntity с соответствующим кодом состояния. Мы делаем это только потому, что нам нужен код состояния NOT_FOUND (404), когда нет учетной записи с данным идентификатором.

8. Заключение

В этой статье мы рассмотрели основы реактивного доступа к базе данных с использованием R2DBC. Несмотря на то, что этот проект находится в зачаточном состоянии, он быстро развивается, и его выпуск намечен на начало 2020 года.

По сравнению с ADBA, который определенно не будет частью Java 12, R2DBC кажется более многообещающим и уже предоставляет драйверы для нескольких популярные базы данных — Oracle здесь заметно отсутствует.

Как обычно, полный исходный код, используемый в этом руководстве, доступен на Github.