Как не сойти с ума от RxJava - глубокое погружение

Всем привет, меня зовут Руслан, я Head of mobile development в одной международной компании. В нашей производственной практике достаточно много проектов используют для упрощенной работы с асинхронщиной фреймворк RxJava.

Обычно изучение RxJava в большинстве статей или онлайн-школ начинается со слов «Жил был Observable/Single/Flowable и мы решили на него подписаться».

После всего этого, как правило идёт пару слов про операторы, усиленный разбор отличий map от flatMap, concatMap, switchMap (мне сразу вспоминается среднестатистическое собеседование в какой-нибудь компании). Дальше идет что-то не очень внятное и совсем теоретическое про горячие источники и на этом всё.

В реальности, начинающий Android разработчик либо начал с coroutines и flow, либо шлёпает RxJava цепочки по одному и тому же алгоритму:

auth(credentials) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe({ response -> Log.d("RESPONSE", response.toString()) }, { throwable -> Log.d("ERROR", throwable.localizedMessage) })

Красота да? У нас есть цепочка, которая что-то получает от бэкэнда, даже работает! Но, в действительности мы даже не представляем как она работает.

Начитавшись умных статей о том, что RxJava построена на основе паттерну Observer мы думаем - Ну вот метод auth(), это издатель, а subscribe это подписчик, subscribeOn - устанавливает стратегию на каком пуле потоков будет работать издатель, а observeOn - определяет на каком пуле потоков будет получать данные наш подписчик, которого мы бережно поместили внутрь метода subscribe.

На этом можно было бы заканчивать статью, но увы, не всё так, как кажется на самом деле. Нет, метод auth(), это действительно источник, а subscribe - подписчик, с одной лишь оговоркой, ПОДПИСЧИК ЗАМЫКАЮЩИЙ ЦЕПОЧКУ (ну т.е. Вызов метода subscribe вернет некий Disposable). Отсюда назревает резонный вопрос, а что бывают какие-то ещё подписчики? Представляете, бывают!

Вот, век живи, век учись, каждый раз работая на проектах компании, где есть RxJava, я открываю для себя её по новому, бесконечный ящик пандоры. Окей, давайте ближе к сути.

Из курсов нам говорят, каждый оператор возвращает нам новый экземпляр источника с видоизмененными данными (если мы применяем какие-то операторы трансформации, комбинации, сортировки и т.д.), но нам забыли упомянуть одну важную вещь…

Каждый оператор это источник, внутри которого есть свой подписчик! Прикиньте? Чтоб в этом убедиться, давайте рассмотрим реализацию функции take под капотом:

//Original source from RxJava3 library public final class ObservableTake<T> extends AbstractObservableWithUpstream<T, T> { final long limit; public ObservableTake(ObservableSource<T> source, long limit) { super(source); this.limit = limit; } @Override protected void subscribeActual(Observer<? super T> observer) { source.subscribe(new TakeObserver<>(observer, limit)); } static final class TakeObserver<T> implements Observer<T>, Disposable { final Observer<? super T> downstream; boolean done; Disposable upstream; long remaining; TakeObserver(Observer<? super T> actual, long limit) { this.downstream = actual; this.remaining = limit; } @Override public void onSubscribe(Disposable d) { if (DisposableHelper.validate(this.upstream, d)) { upstream = d; if (remaining == 0) { done = true; d.dispose(); EmptyDisposable.complete(downstream); } else { downstream.onSubscribe(this); } } } @Override public void onNext(T t) { if (!done && remaining-- > 0) { boolean stop = remaining == 0; downstream.onNext(t); if (stop) { onComplete(); } } } @Override public void onError(Throwable t) { if (done) { RxJavaPlugins.onError(t); return; } done = true; upstream.dispose(); downstream.onError(t); } @Override public void onComplete() { if (!done) { done = true; upstream.dispose(); downstream.onComplete(); } } @Override public void dispose() { upstream.dispose(); } @Override public boolean isDisposed() { return upstream.isDisposed(); } } }

Шок, правда? Т.е. У нас каждый оператор подписывается друг на друга в цепочке и к примеру наличие doOnTerminate{ exitProcess(0) } будет давать разный результат в зависимости от его местоположения в цепочке:

Single.just(1) .subscribeOn(Schedulers.newThread()) .doOnSuccess { logger.warning("First Single on: "+Thread.currentThread().name) } .observeOn(Schedulers.io()) .doOnTerminate { exitProcess(0) } .doOnError { throwable -> logger.warning(throwable.localizedMessage) } .subscribe( { logger.warning("Root subscribe(): "+Thread.currentThread().name) }, { throwable -> logger.warning(throwable.localizedMessage) } )

OUTPUT:

WARNING: Current thread: RxNewThreadScheduler-1

WARNING: Current thread after observeOn: RxCachedThreadS

Вопрос - а где лог с Root subscribe(): "+Thread.currentThread().name - Это нормальное поведение, у нас ведь выполняется метод

doOnTerminate { exitProcess(0) }

который завершает программу, просто он выполняется по очереди со всеми операторами, а не когда корневая цепочка завершит своё выполнение. Убедиться в этом можно, если переставить его в самое начало Rx - цепочки, после выполнения такого алгоритма вы не увидите никаких логов, программа завершится до их появления.

Теперь, держите эту информацию в уме, потому что дальше начнутся странные странности, которые без понимания вот этого материала не объяснить.

Всё было бы так просто, если бы не было так сложно. Я приведу пример:

Single.just(1) .subscribeOn(Schedulers.newThread()) .doOnSuccess { logger.warning("Current thread: "+Thread.currentThread().name) } .observeOn(Schedulers.computation()) .doOnSuccess { logger.warning("Current thread after observeOn: "+Thread.currentThread().name) } .subscribeOn(Schedulers.io()) .doOnError { throwable -> logger.warning(throwable.localizedMessage) } .subscribe( { logger.warning("Root subscribe(): "+Thread.currentThread().name) }, { throwable -> logger.warning(throwable.localizedMessage) } )

OUTPUT:

WARNING: Current thread: RxNewThreadScheduler-1

WARNING: Current thread after observeOn: RxComputationThreadPool-1

WARNING: Root subscribe(): RxComputationThreadPool-1

Вполне реальная ситуация, которая может вызвать ступор после радужного subscribeOn.observeOn. Благо, в документации на гите RxJava об этом написано. Пишут, что нельзя больше одного раза в корневой цепочке вызвать subscribeOn, а вот observeOn можно вызывать сколько угодно. Правило да правило, вот и живи теперь с этим. Ладно, на самом деле subscribeOn можно вызвать сколько угодно раз, но во второстепенных цепочках, которые к примеру вызываются внутри оператора flatMap, но поведение в корневой цепочке будет максимально неожиданным:

Single.just(1) .subscribeOn(Schedulers.newThread()) .doOnSuccess { logger.warning("Current thread: "+Thread.currentThread().name) } .observeOn(Schedulers.computation()) .doOnSuccess { logger.warning("Current thread after observeOn: "+Thread.currentThread().name) } .flatMap { Single.just(2).subscribeOn(Schedulers.io()) } .doOnError { throwable -> logger.warning(throwable.localizedMessage) } .subscribe( { logger.warning("Root subscribe(): "+Thread.currentThread().name) }, { throwable -> logger.warning(throwable.localizedMessage) } )

OUTPUT:

WARNING: Current thread: RxNewThreadScheduler-1

WARNING: Current thread after observeOn: RxComputationThreadPool-1

WARNING: Root subscribe(): RxCachedThreadScheduler-1

Оказывается, в RxJava есть два ключевых понятия, характеризующих порядок работы цепочки - upstream и downstream.

Ниже на скрине я нарисую что такое upstream и downstream:

Как не сойти с ума от RxJava - глубокое погружение

Смысл этих двух терминов в том, что подписка происходит вверх по течению upstream, а выброс данных вниз по течению downstream. Давайте заглянем под капот функции subscribeOn, интересно же, почему в случае без flatMap у нас поток не переключился второй раз через subscribeOn на IO пулл потоков:

Как не сойти с ума от RxJava - глубокое погружение

Вот это поворот! Оказывается внутри функции subscribeOn мы делаем replace передаваемого экземпляра пула потоков и этот replace работает снизу вверх, проходя по КОРНЕВОЙ цепочке, тот вызов subscribeOn который будет самым первым сверху, тот и установит реальный последний примененный пул потоков выполнения, не зря же он называется subscribeOn, при подписке, upstream. Интересно, а что же тогда происходит с observeOn, почему его можно вызвать много раз? Всё просто, у observeOn под капотом тот же replace, но только сверху вниз (downstream), именно по этому он сменится столько раз, сколько мы захотим.

Вы ещё держите в уме, что операторы друг на друга подписываются? Теперь сможете ответить на вопрос, почему subscribeOn во второстепенной цепи меняет поведение корневой? Думаю очевидно.

Если вам понравилась моя статья, подписывайтесь на мой телеграмм-канал

33
2 комментария

Стоило перестать читать хабр, как он пришёл за мной сюда. Спасибо за статью, залип на весь перекур

1
Ответить

Благодарю! Буду стараться и дальше публиковать что-то интересное из реальной практики

1
Ответить