«1. Введение

В этой статье мы рассмотрим, как обрабатывать исключения и ошибки с помощью RxJava.

Во-первых, имейте в виду, что Observable обычно не генерирует исключений. Вместо этого по умолчанию Observable вызывает метод onError() своего наблюдателя, уведомляя наблюдателя о том, что только что произошла неисправимая ошибка, а затем завершает работу, не вызывая больше никаких методов своего наблюдателя.

Операторы обработки ошибок, которые мы собираемся ввести, изменяют поведение по умолчанию, возобновляя или повторяя последовательность Observable.

2. Зависимости Maven

Сначала добавим RxJava в pom.xml:

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.1.3</version>
</dependency>

Последнюю версию артефакта можно найти здесь.

3. Обработка ошибок

Когда возникает ошибка, нам обычно нужно каким-то образом ее обработать. Например, измените связанные внешние состояния, возобновив последовательность с результатами по умолчанию, или просто оставьте все как есть, чтобы ошибка могла распространяться.

3.1. Действие при ошибке

С помощью doOnError мы можем вызвать любое действие, которое необходимо при возникновении ошибки:

@Test
public void whenChangeStateOnError_thenErrorThrown() {
    TestObserver testObserver = new TestObserver();
    AtomicBoolean state = new AtomicBoolean(false);
    Observable
      .error(UNKNOWN_ERROR)
      .doOnError(throwable -> state.set(true))
      .subscribe(testObserver);

    testObserver.assertError(UNKNOWN_ERROR);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
 
    assertTrue("state should be changed", state.get());
}

В случае возникновения исключения при выполнении действия RxJava заключает исключение в CompositeException: ~ ~~

@Test
public void whenExceptionOccurOnError_thenCompositeExceptionThrown() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .doOnError(throwable -> {
          throw new RuntimeException("unexcepted");
      })
      .subscribe(testObserver);

    testObserver.assertError(CompositeException.class);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
}

3.2. Возобновление с элементами по умолчанию

Хотя мы можем вызывать действия с помощью doOnError, ошибка все равно нарушает стандартный поток последовательности. Иногда мы хотим возобновить последовательность с опцией по умолчанию, что и делает onErrorReturnItem:

@Test
public void whenHandleOnErrorResumeItem_thenResumed(){
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .onErrorReturnItem("singleValue")
      .subscribe(testObserver);
 
    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertValueCount(1);
    testObserver.assertValue("singleValue");
}

Если предпочтителен поставщик динамических элементов по умолчанию, мы можем использовать onErrorReturn:

@Test
public void whenHandleOnErrorReturn_thenResumed() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .onErrorReturn(Throwable::getMessage)
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertValueCount(1);
    testObserver.assertValue("unknown error");
}

3.3. Возобновление с другой последовательностью

Вместо того, чтобы возвращаться к одному элементу, мы можем предоставить резервную последовательность данных, используя onErrorResumeNext при обнаружении ошибок. Это поможет предотвратить распространение ошибки:

@Test
public void whenHandleOnErrorResume_thenResumed() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .onErrorResumeNext(Observable.just("one", "two"))
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertValueCount(2);
    testObserver.assertValues("one", "two");
}

Если резервная последовательность отличается в зависимости от конкретных типов исключений или последовательность должна быть сгенерирована функцией, мы можем передать функцию в onErrorResumeNext:

@Test
public void whenHandleOnErrorResumeFunc_thenResumed() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .onErrorResumeNext(throwable -> Observable
        .just(throwable.getMessage(), "nextValue"))
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertValueCount(2);
    testObserver.assertValues("unknown error", "nextValue");
}

~~ ~ 3.4. Обрабатывать только исключение

RxJava также предоставляет резервный метод, который позволяет продолжить последовательность с предоставленным Observable, когда возникает исключение (но не ошибка):

@Test
public void whenHandleOnException_thenResumed() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_EXCEPTION)
      .onExceptionResumeNext(Observable.just("exceptionResumed"))
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertValueCount(1);
    testObserver.assertValue("exceptionResumed");
}

@Test
public void whenHandleOnException_thenNotResumed() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .onExceptionResumeNext(Observable.just("exceptionResumed"))
      .subscribe(testObserver);

    testObserver.assertError(UNKNOWN_ERROR);
    testObserver.assertNotComplete();
}

Как показано в приведенном выше коде, когда возникает ошибка, onExceptionResumeNext не сработает, чтобы возобновить последовательность.

4. Повторите попытку при ошибке

Нормальная последовательность может быть нарушена из-за временного сбоя системы или ошибки серверной части. В этих ситуациях мы хотим повторить попытку и подождать, пока последовательность не будет исправлена.

К счастью, RxJava дает нам возможность сделать именно это.

4.1. Повторить

При использовании повторной попытки Observable будет переподписываться бесконечное количество раз до тех пор, пока не возникнет ошибка. Но в большинстве случаев мы бы предпочли фиксированное количество попыток:

@Test
public void whenRetryOnError_thenRetryConfirmed() {
    TestObserver testObserver = new TestObserver();
    AtomicInteger atomicCounter = new AtomicInteger(0);
    Observable
      .error(() -> {
          atomicCounter.incrementAndGet();
          return UNKNOWN_ERROR;
      })
      .retry(1)
      .subscribe(testObserver);

    testObserver.assertError(UNKNOWN_ERROR);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
    assertTrue("should try twice", atomicCounter.get() == 2);
}

4.2. Повторить по условию

Условный повтор также возможен в RxJava, используя повтор с предикатами или используя retryUntil:

@Test
public void whenRetryConditionallyOnError_thenRetryConfirmed() {
    TestObserver testObserver = new TestObserver();
    AtomicInteger atomicCounter = new AtomicInteger(0);
    Observable
      .error(() -> {
          atomicCounter.incrementAndGet();
          return UNKNOWN_ERROR;
      })
      .retry((integer, throwable) -> integer < 4)
      .subscribe(testObserver);

    testObserver.assertError(UNKNOWN_ERROR);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
    assertTrue("should call 4 times", atomicCounter.get() == 4);
}

@Test
public void whenRetryUntilOnError_thenRetryConfirmed() {
    TestObserver testObserver = new TestObserver();
    AtomicInteger atomicCounter = new AtomicInteger(0);
    Observable
      .error(UNKNOWN_ERROR)
      .retryUntil(() -> atomicCounter.incrementAndGet() > 3)
      .subscribe(testObserver);
    testObserver.assertError(UNKNOWN_ERROR);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
    assertTrue("should call 4 times", atomicCounter.get() == 4);
}

4.3. RetryWhen

Помимо этих основных опций, есть также интересный метод повтора: retryWhen.

Это возвращает Observable, скажем «NewO», который выдает те же значения, что и исходный ObservableSource, скажем, «OldO», но если возвращаемый Observable «NewO» вызывает onComplete или onError, onComplete или onError подписчика будут вызываться.

И если «NewO» выдает какой-либо элемент, будет активирована повторная подписка на исходный ObservableSource «OldO».

Приведенные ниже тесты показывают, как это работает:

@Test
public void whenRetryWhenOnError_thenRetryConfirmed() {
    TestObserver testObserver = new TestObserver();
    Exception noretryException = new Exception("don't retry");
    Observable
      .error(UNKNOWN_ERROR)
      .retryWhen(throwableObservable -> Observable.error(noretryException))
      .subscribe(testObserver);

    testObserver.assertError(noretryException);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
}

@Test
public void whenRetryWhenOnError_thenCompleted() {
    TestObserver testObserver = new TestObserver();
    AtomicInteger atomicCounter = new AtomicInteger(0);
    Observable
      .error(() -> {
        atomicCounter.incrementAndGet();
        return UNKNOWN_ERROR;
      })
      .retryWhen(throwableObservable -> Observable.empty())
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertNoValues();
    assertTrue("should not retry", atomicCounter.get()==0);
}

@Test
public void whenRetryWhenOnError_thenResubscribed() {
    TestObserver testObserver = new TestObserver();
    AtomicInteger atomicCounter = new AtomicInteger(0);
    Observable
      .error(() -> {
        atomicCounter.incrementAndGet();
        return UNKNOWN_ERROR;
      })
      .retryWhen(throwableObservable -> Observable.just("anything"))
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertNoValues();
    assertTrue("should retry once", atomicCounter.get()==1);
}

Типичное использование retryWhen ограничено повторными попытками с переменной задержкой:

@Test
public void whenRetryWhenForMultipleTimesOnError_thenResumed() {
    TestObserver testObserver = new TestObserver();
    long before = System.currentTimeMillis();
    Observable
      .error(UNKNOWN_ERROR)
      .retryWhen(throwableObservable -> throwableObservable
        .zipWith(Observable.range(1, 3), (throwable, integer) -> integer)
        .flatMap(integer -> Observable.timer(integer, TimeUnit.SECONDS)))
      .blockingSubscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertNoValues();
    long secondsElapsed = (System.currentTimeMillis() - before)/1000;
    assertTrue("6 seconds should elapse",secondsElapsed == 6 );
}

Обратите внимание, что эта логика повторяет три попытки и постепенно задерживает каждую повторную попытку.

5. Резюме

В этой статье мы представили несколько способов обработки ошибок и исключений в RxJava.

Есть также несколько специфичных для RxJava исключений, связанных с обработкой ошибок — более подробную информацию можно найти на официальной вики.

Как всегда, полную реализацию можно найти на Github.