RxJava如何结合观察者与链式处理

 Author: Dorae

Date: 2018年12月3日17:10:31
转载请注明出处


一、概述

首先问自己几个问题,如果非常清楚这几个问题的目的与答案,那么恭喜你,不用继续往下看了-_-。

  1. RxJava是干什么的;
  2. 链式调用中当存在数个Observable.subscribeOn()时的处理方式;
  3. 链式调用中当存在数个Observable.observeOn()时的处理方式;
  4. 数据是如何经过操作符进行的处理。

回顾

观察者模式

如图1-1所示

图 1-1

Java8的stream

图 3-1

现在我们再来看看code1-1,其最终形成的Observable链如图3-2所示,每次调用map、filter等操作,都会生成一个新对象,并且保持了一个对上游的引用(用于生成Observer链)。

图 3-2

Observer链如图3-3所示,整个事件流程由CreateEmitter触发,最终交由我们的实现Observer$1处理。

图 3-3

看了上边几张图之后,是不是感觉清晰了很多?那么让我们进一步看下Rxjava如何完成了一键线程切换。

四、RxJava如何实现线程切换

通常我们使用RxJava的线程切换功能时,只需要在调用链中加上一句subscribeOn()或observeOn(),其中Scheduler如上所述,其实就是一个包装了ThreadPool的调度器。那么我们先来看下相关源码。

1、subscribeOn

如代码code4-1所示,为subscribeOn的核心代码。很明显,其中在新线程中只是简单的直接调用了source,也就是说这里之后的所有操作均在一个新线程中进行,和单线程并没有什么区别。

code 4-1

public final Observable<T> subscribeOn(Scheduler scheduler) {     return new ObservableSubscribeOn<T>() {         @Override         public void subscribeActual(final Observer<? super T> observer) {             scheduler.createWorker().schedule(new SubscribeTask() {                 @Override                 public void run() {                     source.subscribe(e);                 }             });         }     }; }

2、observeOn

如代码段code4-2所示,为observeOn的核心逻辑,可以看出其在订阅阶段(生成Observer链的阶段)还是在当前线程执行,只有触发之后,到了ObserverOn的Observer的节点时才会真正的切换到新线程。

code 4-2

public final Observable<T> observeOn(Scheduler scheduler) {     return new ObservableOnSubscribe<T>() {         @Override         public void subscribeActual(@NonNull Observer<Object> e) {              source.subscribe(new Observer<T>() {                  @Override                 public void onNext(T var1) {                     scheduler.createWorker().schedule(new Runnable() {                         @Override                         public void run() {                             e.onNext(var1);                         }                     });                 }             });         }     }; }

多次Observable.subscribeOn()、多次Observable.observeOn()会发生什么

通过上述code4-1、code4-2的分析,是不是可以推断出当多次subscribeOn时会发生什么?没错,虽然每次subscribeOn都会产生一次线程切换,但是真正起作用的只有最开始的一次subscribeOn,也就相当于只在最初的位置调用了subscribeOn;对于observeOn也是类似,每次都会产生新线程,但是每次都会产生一定的影响,也就是每个线程都承担了一部分工作。

关键字:

50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信