«1. Обзор
В этой статье мы рассмотрим, как библиотека RxJava помогает нам справляться с противодавлением.
Проще говоря, RxJava использует концепцию реактивных потоков, вводя Observables, на которые может подписаться один или несколько наблюдателей. Работа с потенциально бесконечными потоками очень сложна, так как нам нужно столкнуться с проблемой противодавления.
Нетрудно попасть в ситуацию, когда Observable выдает элементы быстрее, чем подписчик может их потреблять. Мы рассмотрим различные решения проблемы растущего буфера неиспользованных предметов.
2. Hot Observables против Cold Observables
Во-первых, давайте создадим простую функцию-потребитель, которая будет использоваться как потребитель элементов из Observables, которые мы определим позже:
public class ComputeFunction {
public static void compute(Integer v) {
try {
System.out.println("compute integer v: " + v);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Наша функция calculate() просто печатая аргумент. Здесь важно отметить, что вызывается метод Thread.sleep(1000) — мы делаем это, чтобы эмулировать какую-то длительную задачу, которая заставит Observable заполниться элементами быстрее, чем Observer сможет их использовать.
У нас есть два типа Observables — Hot и Cold — которые совершенно разные, когда дело доходит до обработки противодавления.
2.1. Холодные наблюдаемые
Холодные наблюдаемые испускают определенную последовательность элементов, но могут начать испускать эту последовательность, когда ее Наблюдатель сочтет это удобным, и с любой скоростью, которую желает Наблюдатель, не нарушая целостность последовательности. Cold Observable предоставляет предметы ленивым способом.
Наблюдатель берет элементы только тогда, когда он готов обработать этот элемент, и элементы не нужно буферизовать в Observable, потому что они запрашиваются методом извлечения.
Например, если вы создаете Observable на основе статического диапазона элементов от одного до миллиона, этот Observable будет выдавать одну и ту же последовательность элементов независимо от того, как часто эти элементы наблюдаются:
Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute);
Когда мы запускаем нашу программу, элементы будут вычисляться Observer лениво и запрашиваться в режиме pull. Метод Schedulers.computation() означает, что мы хотим запустить наш Observer в пуле потоков вычислений в RxJava.
Вывод программы будет состоять из результата метода calculate(), вызываемого для одного за другим элемента из Observable: в натяжной манере. Примеры элементов, испускаемых холодным Observable, могут включать результаты запроса к базе данных, поиска файлов или веб-запроса.
compute integer v: 1
compute integer v: 2
compute integer v: 3
compute integer v: 4
...
2.2. Горячие наблюдаемые
Горячие наблюдаемые начинают генерировать элементы и испускают их сразу же после их создания. Это противоречит вытягивающей модели обработки Cold Observables. Hot Observable испускает предметы в своем собственном темпе, и его наблюдатели должны не отставать.
Когда Observer не может потреблять элементы так же быстро, как они создаются Observable, их необходимо буферизовать или обрабатывать каким-либо другим способом, так как они будут заполнять память, что в конечном итоге приведет к OutOfMemoryException.
Давайте рассмотрим пример горячего Observable, который производит 1 миллион элементов для конечного потребителя, который обрабатывает эти элементы. Когда метод Compute() в Observer требует некоторого времени для обработки каждого элемента, Observable начинает заполнять память элементами, что приводит к сбою программы: мы не определили способ обработки перепроизводства Observable.
Примеры элементов, испускаемых горячим Observable, могут включать события мыши и клавиатуры, системные события или курсы акций.
PublishSubject<Integer> source = PublishSubject.<Integer>create();
source.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace);
IntStream.range(1, 1_000_000).forEach(source::onNext);
3. Буферизация избыточного Observable
Первый способ справиться с избыточным Observable — это определить своего рода буфер для элементов, которые не могут быть обработаны Observer.
Мы можем сделать это, вызвав метод buffer():
Определение буфера размером 1024 даст наблюдателю некоторое время, чтобы догнать источник с перепроизводством. Буфер будет хранить элементы, которые еще не были обработаны.
«Мы можем увеличить размер буфера, чтобы было достаточно места для производимых значений.
PublishSubject<Integer> source = PublishSubject.<Integer>create();
source.buffer(1024)
.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace);
Обратите внимание, однако, что, как правило, это может быть только временное исправление, поскольку переполнение все еще может произойти, если источник превышает прогнозируемый размер буфера.
4. Пакетирование выдаваемых элементов
Мы можем группировать перепроизводимые элементы в окнах из N элементов.
Когда Observable создает элементы быстрее, чем Observer может их обработать, мы можем облегчить эту ситуацию, сгруппировав созданные элементы вместе и отправив пакет элементов в Observer, который может обрабатывать набор элементов вместо одного элемента за другим: ~~ ~
Использование метода window() с аргументом 500 укажет Observable сгруппировать элементы в пакеты размером 500. Этот метод может уменьшить проблему перепроизводства Observable, когда Observer может обрабатывать пакет элементов быстрее, чем обработка элементов один за другим.
5. Пропуск элементов
PublishSubject<Integer> source = PublishSubject.<Integer>create();
source.window(500)
.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace);
Если некоторые значения, созданные Observable, можно безопасно игнорировать, мы можем использовать выборку в течение определенного времени и операторы дросселирования.
Методы sample() и ThrottleFirst() принимают продолжительность в качестве параметра:
Метод sample() периодически просматривает последовательность элементов и выдает последний элемент, который был создан в течение продолжительности, указанной в качестве параметра. Метод ThrottleFirst() выдает первый элемент, который был создан после продолжительности, указанной в качестве параметра.
Длительность — это время, после которого из последовательности созданных элементов выбирается один конкретный элемент. Мы можем указать стратегию обработки противодавления путем пропуска элементов:
-
Мы указали, что стратегия пропуска элементов будет методом sample(). Нам нужен образец последовательности продолжительностью 100 миллисекунд. Этот элемент будет передан наблюдателю.
Помните, однако, что эти операторы только снижают скорость получения значения нижестоящим наблюдателем и, таким образом, они все еще могут приводить к MissingBackpressureException.
PublishSubject<Integer> source = PublishSubject.<Integer>create();
source.sample(100, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace);
6. Обработка заполнения наблюдаемого буфера
В случае, если наши стратегии выборки или пакетирования элементов не помогают с заполнением буфера, нам необходимо реализовать стратегию обработки случаев, когда буфер заполняется.
Нам нужно использовать метод onBackpressureBuffer(), чтобы предотвратить исключение BufferOverflowException.
Метод onBackpressureBuffer() принимает три аргумента: емкость буфера Observable, метод, вызываемый при заполнении буфера, и стратегию обработки элементов, которые необходимо удалить из буфера. Стратегии переполнения находятся в классе BackpressureOverflow.
Существует 4 типа действий, которые могут быть выполнены при заполнении буфера:
ON_OVERFLOW_ERROR — это поведение по умолчанию, сигнализирующее об исключении BufferOverflowException, когда буфер заполнен. ON_OVERFLOW_DEFAULT — в настоящее время это то же самое, что и ON_OVERFLOW_ERROR. ON_OVERFLOW_DROP_LATEST — если произойдет переполнение, текущее значение будет просто проигнорировано, и только старые значения будут доставлены после того, как нижестоящий Observer запросит ON_OVERFLOW_DROP_OLDEST — удаляет самый старый элемент в буфере и добавляет к нему текущее значение ~~ ~ Давайте посмотрим, как указать эту стратегию:
Здесь наша стратегия обработки переполнения буфера заключается в удалении самого старого элемента в буфере и добавлении нового элемента, созданного Observable.
-
Обратите внимание, что последние две стратегии вызывают разрыв в потоке, поскольку они пропускают элементы. Кроме того, они не будут сигнализировать об исключении BufferOverflowException.
7. Отбрасывание всех избыточных элементов
Observable.range(1, 1_000_000)
.onBackpressureBuffer(16, () -> {}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST)
.observeOn(Schedulers.computation())
.subscribe(e -> {}, Throwable::printStackTrace);
Всякий раз, когда нижестоящий наблюдатель не готов принять элемент, мы можем использовать метод onBackpressureDrop(), чтобы удалить этот элемент из последовательности.
Мы можем думать об этом методе как о методе onBackpressureBuffer() с емкостью буфера, установленной на ноль, со стратегией ON_OVERFLOW_DROP_LATEST.
Этот оператор полезен, когда мы можем безопасно игнорировать значения из источника Observable (такие как движения мыши или текущие сигналы местоположения GPS), так как позже будут более актуальные значения:
«
«Метод onBackpressureDrop() устраняет проблему перепроизводства Observable, но его следует использовать с осторожностью.
8. Заключение
Observable.range(1, 1_000_000)
.onBackpressureDrop()
.observeOn(Schedulers.computation())
.doOnNext(ComputeFunction::compute)
.subscribe(v -> {}, Throwable::printStackTrace);
В этой статье мы рассмотрели проблему перепроизводства Observable и способы борьбы с противодавлением. Мы рассмотрели стратегии буферизации, группирования и пропуска элементов, когда Observer не может потреблять элементы так же быстро, как они создаются Observable.
Реализацию всех этих примеров и фрагментов кода можно найти в проекте GitHub — это проект Maven, поэтому его должно быть легко импортировать и запускать как есть.
«
The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.