«1. Введение

В этой статье мы продемонстрируем, как использовать начальные приложения Spring Cloud, которые предоставляют загрузочные и готовые к работе приложения, которые могут служить отправной точкой для будущей разработки.

Проще говоря, стартеры приложений Task предназначены для таких сценариев использования, как миграция базы данных и распределенное тестирование, а стартеры Stream App обеспечивают интеграцию с внешними системами.

Всего более 55 участников; ознакомьтесь с официальной документацией здесь и здесь для получения дополнительной информации об этих двух.

Далее мы создадим небольшое распределенное приложение Twitter, которое будет передавать сообщения Twitter в распределенную файловую систему Hadoop.

2. Начало установки

Мы будем использовать ключ потребителя и токен доступа для создания простого приложения Twitter.

Затем мы настроим Hadoop, чтобы мы могли сохранить наш Twitter Stream для будущих целей больших данных.

Наконец, у нас есть возможность либо использовать поставляемые репозитории Spring GitHub для компиляции и сборки автономных компонентов шаблона архитектуры «исходники-процессоры-приемники» с использованием Maven, либо объединять источники, процессоры и приемники через их интерфейсы привязки Spring Stream.

Мы рассмотрим оба способа сделать это.

Стоит отметить, что раньше все Stream App Starters были объединены в один большой репозиторий по адресу github.com/spring-cloud/spring-cloud-stream-app-starters. Каждый Starter был упрощен и изолирован.

3. Учетные данные Twitter

Во-первых, давайте настроим наши учетные данные Twitter Developer. Чтобы получить учетные данные разработчика Twitter, следуйте инструкциям по настройке приложения и созданию маркера доступа из официальной документации разработчика Twitter.

В частности, нам понадобятся:

  1. Consumer Key
  2. Consumer Key Secret
  3. Access Token Secret
  4. Access Token

Обязательно держите это окно открытым или запишите его, так как мы будем использовать его ниже!

4. Установка Hadoop

Теперь давайте установим Hadoop! Мы можем либо следовать официальной документации, либо просто использовать Docker. репозитории.

$ sudo docker run -p 50070:50070 sequenceiq/hadoop-docker:2.4.1

5.1. Стартер приложения Twitter Spring Cloud Stream

Давайте добавим стартер приложения Twitter Spring Cloud Stream (org.springframework.cloud.stream.app.twitterstream.source) в наш проект:

Затем мы запускаем Maven: ~ ~~

Полученное скомпилированное стартовое приложение будет доступно в «/target» локального корня проекта.

git clone https://github.com/spring-cloud-stream-app-starters/twitter.git

Затем мы можем запустить этот скомпилированный .jar и передать соответствующие свойства приложения следующим образом:

./mvnw clean install -PgenerateApps

Мы также можем передать наши учетные данные, используя знакомые Spring application.properties:

5.2. Стартовое приложение HDFS Spring Cloud Stream

java -jar twitter_stream_source.jar --consumerKey=<CONSUMER_KEY> --consumerSecret=<CONSUMER_SECRET> \
    --accessToken=<ACCESS_TOKEN> --accessTokenSecret=<ACCESS_TOKEN_SECRET>

Теперь (с уже настроенным Hadoop) давайте добавим в наш проект зависимость HDFS Spring Cloud Stream App Starter (org.springframework.cloud.stream.app.hdfs.sink).

twitter.credentials.access-token=...
twitter.credentials.access-token-secret=...
twitter.credentials.consumer-key=...
twitter.credentials.consumer-secret=...

Сначала клонируйте соответствующий репозиторий:

Затем запустите задание Maven:

Полученное скомпилированное стартовое приложение будет доступно в «/target» локального корня проекта. Затем мы можем запустить этот скомпилированный .jar и передать соответствующие свойства приложения:

git clone https://github.com/spring-cloud-stream-app-starters/hdfs.git

«hdfs://127.0.0.1:50010/» — это значение по умолчанию для Hadoop, о том, как вы настроили свой экземпляр.

./mvnw clean install -PgenerateApps

Мы можем увидеть список узлов данных (и их текущих портов) по адресу «http://0.0.0.0:50070», учитывая конфигурацию, которую мы передали ранее.

java -jar hdfs-sink.jar --fsUri=hdfs://127.0.0.1:50010/

Мы также можем передавать свои учетные данные, используя знакомые Spring application.properties перед компиляцией, поэтому нам не нужно всегда передавать их через CLI.

Давайте настроим наш application.properties для использования порта Hadoop по умолчанию:

6. Использование AggregateApplicationBuilder

В качестве альтернативы мы можем объединить наш Spring Stream Source и Sink через org.springframework.cloud.stream. агрегат.AggregateApplicationBuilder в простое приложение Spring Boot!

hdfs.fs-uri=hdfs://127.0.0.1:50010/

Во-первых, мы добавим два стартовых приложения Stream в наш pom.xml:

«

«Затем мы начнем объединять две наши зависимости Stream App Starter, заключая их в соответствующие подприложения.

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud.stream.app</groupId>
        <artifactId>spring-cloud-starter-stream-source-twitterstream</artifactId>
        <version>2.1.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud.stream.app</groupId>
        <artifactId>spring-cloud-starter-stream-sink-hdfs</artifactId>
        <version>2.1.2.RELEASE</version>
    </dependency>
</dependencies>

6.1. Создание компонентов нашего приложения

Наше SourceApp указывает источник, который необходимо преобразовать или использовать:

Обратите внимание, что мы привязываем наше SourceApp к org.springframework.cloud.stream.messaging.Source и внедряем соответствующий класс конфигурации для выбора задайте необходимые настройки из наших свойств окружающей среды.

@SpringBootApplication
@EnableBinding(Source.class)
@Import(TwitterstreamSourceConfiguration.class)
public class SourceApp {
    @InboundChannelAdapter(Source.OUTPUT)
    public String timerMessageSource() {
        return new SimpleDateFormat().format(new Date());
    }
}

Затем мы настраиваем простую привязку org.springframework.cloud.stream.messaging.Processor:

Затем мы создаем нашего потребителя (Sink):

@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApp {
    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public String processMessage(String payload) {
        log.info("Payload received!");
        return payload;
    }
}

Здесь мы привязываем наш SinkApp в org.springframework.cloud.stream.messaging.Sink и снова внедрите правильный класс конфигурации, чтобы использовать указанные нами настройки Hadoop.

@SpringBootApplication
@EnableBinding(Sink.class)
@Import(HdfsSinkConfiguration.class)
public class SinkApp {
    @ServiceActivator(inputChannel= Sink.INPUT)
    public void loggerSink(Object payload) {
        log.info("Received: " + payload);
    }
}

Наконец, мы объединяем наши SourceApp, ProcessorApp и SinkApp, используя AggregateApplicationBuilder в нашем основном методе AggregateApp:

Как и в любом приложении Spring Boot, мы можем внедрить указанные параметры в качестве свойств среды через application.properties или программно.

@SpringBootApplication
public class AggregateApp {
    public static void main(String[] args) {
        new AggregateApplicationBuilder()
          .from(SourceApp.class).args("--fixedDelay=5000")
          .via(ProcessorApp.class)
          .to(SinkApp.class).args("--debug=true")
          .run(args);
    }
}

Поскольку мы используем среду Spring Stream, мы также можем передать наши аргументы в конструктор AggregateApplicationBuilder.

6.2. Запуск завершенного приложения

Затем мы можем скомпилировать и запустить наше приложение, используя следующие инструкции командной строки:

Не забудьте хранить каждый класс @SpringBootApplication в отдельном пакете (иначе будет выдано несколько разных исключений привязки) ! Для получения дополнительной информации о том, как использовать AggregateApplicationBuilder, ознакомьтесь с официальной документацией.

    $ mvn install
    $ java -jar twitterhdfs.jar

После того, как мы скомпилируем и запустим наше приложение, мы должны увидеть что-то вроде следующего в нашей консоли (естественно, содержание зависит от твита):

Они демонстрируют правильную работу нашего процессора и стока при получении данных от источник! В этом примере мы не настроили наш HDFS Sink так, чтобы он делал что-либо — он просто напечатает сообщение «Полезная нагрузка получена!»

2018-01-15 04:38:32.255  INFO 28778 --- [itterSource-1-1] 
c.b.twitterhdfs.processor.ProcessorApp   : Payload received!
2018-01-15 04:38:32.255  INFO 28778 --- [itterSource-1-1] 
com.baeldung.twitterhdfs.sink.SinkApp    : Received: {"created_at":
"Mon Jan 15 04:38:32 +0000 2018","id":952761898239385601,"id_str":
"952761898239385601","text":"RT @mighty_jimin: 180114 ...

7. Заключение

В этом уроке мы узнали, как объедините два замечательных стартовых приложения Spring Stream в один прекрасный пример Spring Boot!

Вот несколько других замечательных официальных статей о Spring Boot Starters и о том, как создать индивидуальный стартер!

Как всегда, код, использованный в статье, можно найти на GitHub.

«