«1. Обзор

В этой статье мы сосредоточимся на различных типах Планировщиков, которые будем использовать при написании многопоточных программ на основе методов RxJava Observable subscribeOn иObservable.

Планировщики дают возможность указать, где и, вероятно, когда выполнять задачи, связанные с работой цепочки Observable.

Мы можем получить Scheduler из фабричных методов, описанных в классе Schedulers.

2. Поведение потоков по умолчанию

По умолчанию Rx является однопоточным, что означает, что Observable и цепочка операторов, которые мы можем применить к нему, будут уведомлять своих наблюдателей в том же потоке, в котором находится его метод subscribe(). называется.

МетодыObservOn и SubscribeOn принимают в качестве аргумента Планировщик, который, как следует из названия, представляет собой инструмент, который мы можем использовать для планирования отдельных действий.

Мы создадим нашу реализацию Планировщика, используя метод createWorker, который возвращает Scheduler.Worker. Рабочий процесс принимает действия и выполняет их последовательно в одном потоке.

В некотором смысле worker сам по себе является планировщиком, но мы не будем называть его планировщиком, чтобы избежать путаницы.

2.1. Планирование действия

Мы можем запланировать задание в любом Планировщике, создав нового работника и запланировав некоторые действия:

Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> result += "action");
 
Assert.assertTrue(result.equals("action"));

Затем действие ставится в очередь в потоке, которому назначен работник.

2.2. Отмена действия

Scheduler.Worker расширяет подписку. Вызов метода отмены подписки для рабочего процесса приведет к очистке очереди и отмене всех ожидающих выполнения задач. Мы можем видеть это на примере:

Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += "First_Action";
    worker.unsubscribe();
});
worker.schedule(() -> result += "Second_Action");
 
Assert.assertTrue(result.equals("First_Action"));

Вторая задача никогда не выполняется, потому что предыдущая отменила всю операцию. Действия, которые находились в процессе выполнения, будут прерваны.

3. Schedulers.newThread

Этот планировщик просто запускает новый поток каждый раз, когда его запрашивают через subscribeOn() илиObservOn().

Вряд ли это хороший выбор, не только из-за задержки, связанной с запуском потока, но также и потому, что этот поток не используется повторно:

Observable.just("Hello")
  .observeOn(Schedulers.newThread())
  .doOnNext(s ->
    result2 += Thread.currentThread().getName()
  )
  .observeOn(Schedulers.newThread())
  .subscribe(s ->
    result1 += Thread.currentThread().getName()
  );
Thread.sleep(500);
Assert.assertTrue(result1.equals("RxNewThreadScheduler-1"));
Assert.assertTrue(result2.equals("RxNewThreadScheduler-2"));

Когда Worker завершается, поток просто завершается. Этот Планировщик можно использовать только тогда, когда задачи являются крупнозернистыми: на выполнение уходит много времени, но их очень мало, так что повторное использование потоков вряд ли вообще возможно.

Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += Thread.currentThread().getName() + "_Start";
    worker.schedule(() -> result += "_worker_");
    result += "_End";
});
Thread.sleep(3000);
Assert.assertTrue(result.equals(
  "RxNewThreadScheduler-1_Start_End_worker_"));

Когда мы запланировали рабочий процесс в NewThreadScheduler, мы увидели, что рабочий процесс привязан к определенному потоку.

4. Schedulers.immediate

Schedulers.immediate — это специальный планировщик, который вызывает задачу в клиентском потоке блокирующим, а не асинхронным способом и возвращает результат, когда действие завершено:

Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += Thread.currentThread().getName() + "_Start";
    worker.schedule(() -> result += "_worker_");
    result += "_End";
});
Thread.sleep(500);
Assert.assertTrue(result.equals(
  "main_Start_worker__End"));

На самом деле , подписка на Observable через немедленный планировщик обычно имеет тот же эффект, что и полное отсутствие подписки на какой-либо конкретный планировщик: в том же потоке, эффективно блокируя.

Observable.just("Hello")
  .subscribeOn(Schedulers.immediate())
  .subscribe(s ->
    result += Thread.currentThread().getName()
  );
Thread.sleep(500);
Assert.assertTrue(result.equals("main"));

Однако предстоящая задача выполняется, когда все ранее запланированные задачи завершены:

Немедленно вызывает заданную задачу сразу, тогда как трамплин ожидает завершения текущей задачи.

Рабочий процесс батута выполняет каждую задачу в потоке, запланировавшем первую задачу. Первый вызов schedule блокируется до тех пор, пока очередь не будет очищена:

Observable.just(2, 4, 6, 8)
  .subscribeOn(Schedulers.trampoline())
  .subscribe(i -> result += "" + i);
Observable.just(1, 3, 5, 7, 9)
  .subscribeOn(Schedulers.trampoline())
  .subscribe(i -> result += "" + i);
Thread.sleep(500);
Assert.assertTrue(result.equals("246813579"));

6. Schedulers.from

Планировщики внутренне более сложны, чем Executors из java.util.concurrent, поэтому потребовалась отдельная абстракция.

Scheduler scheduler = Schedulers.trampoline();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += Thread.currentThread().getName() + "Start";
    worker.schedule(() -> {
        result += "_middleStart";
        worker.schedule(() ->
            result += "_worker_"
        );
        result += "_middleEnd";
    });
    result += "_mainEnd";
});
Thread.sleep(500);
Assert.assertTrue(result
  .equals("mainStart_mainEnd_middleStart_middleEnd_worker_"));

Но поскольку они концептуально очень похожи, неудивительно, что существует оболочка, которая может превратить Executor в Scheduler, используя метод from factory:

SchedulerB используется в течение короткого периода времени, но едва планирует новый действие на schedulerA, который делает всю работу. Таким образом, несколько методов subscribeOn не только игнорируются, но и создают небольшие накладные расходы.

7. Schedulers.io

private ThreadFactory threadFactory(String pattern) {
    return new ThreadFactoryBuilder()
      .setNameFormat(pattern)
      .build();
}

@Test
public void givenExecutors_whenSchedulerFrom_thenReturnElements() 
 throws InterruptedException {
 
    ExecutorService poolA = newFixedThreadPool(
      10, threadFactory("Sched-A-%d"));
    Scheduler schedulerA = Schedulers.from(poolA);
    ExecutorService poolB = newFixedThreadPool(
      10, threadFactory("Sched-B-%d"));
    Scheduler schedulerB = Schedulers.from(poolB);

    Observable<String> observable = Observable.create(subscriber -> {
      subscriber.onNext("Alfa");
      subscriber.onNext("Beta");
      subscriber.onCompleted();
    });;

    observable
      .subscribeOn(schedulerA)
      .subscribeOn(schedulerB)
      .subscribe(
        x -> result += Thread.currentThread().getName() + x + "_",
        Throwable::printStackTrace,
        () -> result += "_Completed"
      );
    Thread.sleep(2000);
    Assert.assertTrue(result.equals(
      "Sched-A-0Alfa_Sched-A-0Beta__Completed"));
}

Этот планировщик похож на newThread, за исключением того факта, что уже запущенные потоки перерабатываются и, возможно, могут обрабатывать будущие запросы.

«Эта реализация работает аналогично ThreadPoolExecutor из java.util.concurrent с неограниченным пулом потоков. Каждый раз, когда запрашивается новый рабочий поток, либо запускается новый поток (и позже некоторое время простаивает), либо повторно используется простаивающий:

Мы должны быть осторожны с неограниченными ресурсами любого рода — в случае медленных или не отвечающих внешних зависимостей, таких как веб-службы, планировщик ввода-вывода может запустить огромное количество потоков, что приведет к тому, что наше собственное приложение перестанет отвечать.

На практике использование Schedulers.io почти всегда является лучшим выбором.

Observable.just("io")
  .subscribeOn(Schedulers.io())
  .subscribe(i -> result += Thread.currentThread().getName());
 
Assert.assertTrue(result.equals("RxIoScheduler-2"));

8. Schedulers.computation

Планировщик вычислений по умолчанию ограничивает количество параллельно работающих потоков значением availableProcessors(), как это указано в служебном классе Runtime.getRuntime().

Таким образом, мы должны использовать планировщик вычислений, когда задачи полностью связаны с процессором; то есть они требуют вычислительной мощности и не имеют блокирующего кода.

Он использует неограниченную очередь перед каждым потоком, поэтому, если задача запланирована, но все ядра заняты, она будет поставлена ​​в очередь. Однако очередь непосредственно перед каждым потоком будет расти:

Если по какой-то причине нам нужно количество потоков, отличное от значения по умолчанию, мы всегда можем использовать системное свойство rx.scheduler.max-computation-threads. .

Используя меньшее количество потоков, мы можем гарантировать, что одно или несколько ядер ЦП всегда простаивают, и даже при большой нагрузке пул вычислительных потоков не перегружает сервер. Просто невозможно иметь больше вычислительных потоков, чем ядер.

Observable.just("computation")
  .subscribeOn(Schedulers.computation())
  .subscribe(i -> result += Thread.currentThread().getName());
Assert.assertTrue(result.equals("RxComputationScheduler-1"));

9. Schedulers.test

Этот планировщик используется только в целях тестирования, и мы никогда не увидим его в рабочем коде. Его основным преимуществом является возможность опережать часы, имитируя произвольное прохождение времени:

10. Планировщики по умолчанию

Некоторые операторы Observable в RxJava имеют альтернативные формы, которые позволяют нам установить, какой планировщик будет использоваться оператором для его эксплуатация. Другие не работают с каким-либо конкретным планировщиком или не работают с конкретным планировщиком по умолчанию.

List<String> letters = Arrays.asList("A", "B", "C");
TestScheduler scheduler = Schedulers.test();
TestSubscriber<String> subscriber = new TestSubscriber<>();

Observable<Long> tick = Observable
  .interval(1, TimeUnit.SECONDS, scheduler);

Observable.from(letters)
  .zipWith(tick, (string, index) -> index + "-" + string)
  .subscribeOn(scheduler)
  .subscribe(subscriber);

subscriber.assertNoValues();
subscriber.assertNotCompleted();

scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
subscriber.assertNoErrors();
subscriber.assertValueCount(1);
subscriber.assertValues("0-A");

scheduler.advanceTimeTo(3, TimeUnit.SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(3);
assertThat(
  subscriber.getOnNextEvents(), 
  hasItems("0-A", "1-B", "2-C"));

Например, оператор задержки принимает восходящие события и отправляет их вниз по течению через заданное время. Очевидно, что он не может удерживать исходный поток в течение этого периода, поэтому он должен использовать другой планировщик:

Без предоставления специального планировщикаA все операторы ниже задержки будут использовать планировщик вычислений.

Другими важными операторами, которые поддерживают пользовательские планировщики, являются буфер, интервал, диапазон, таймер, пропуск, взятие, тайм-аут и некоторые другие. Если мы не предоставляем планировщик таким операторам, используется планировщик вычислений, который в большинстве случаев является безопасным по умолчанию.

ExecutorService poolA = newFixedThreadPool(
  10, threadFactory("Sched1-"));
Scheduler schedulerA = Schedulers.from(poolA);
Observable.just('A', 'B')
  .delay(1, TimeUnit.SECONDS, schedulerA)
  .subscribe(i -> result+= Thread.currentThread().getName() + i + " ");

Thread.sleep(2000);
Assert.assertTrue(result.equals("Sched1-A Sched1-B "));

11. Заключение

В действительно реактивных приложениях, для которых все длительные операции являются асинхронными, требуется очень мало потоков и, следовательно, планировщиков.

Освоение планировщиков необходимо для написания масштабируемого и безопасного кода с использованием RxJava. Разница между subscribeOn иObservOn особенно важна при высокой нагрузке, когда каждая задача должна выполняться именно тогда, когда мы ожидаем.

И последнее, но не менее важное: мы должны быть уверены, что планировщики, используемые ниже по течению, могут справиться с нагрузкой, создаваемой планировщиками выше по течению. Для получения дополнительной информации есть эта статья о обратном давлении.

Реализацию всех этих примеров и фрагментов кода можно найти в проекте GitHub — это проект Maven, поэтому его должно быть легко импортировать и запускать как есть.

«

The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.