资讯专栏INFORMATION COLUMN

RxJava2.x源码解析(一):订阅流程

harryhappy / 652人阅读

摘要:现在网上已经有大量的源码分析文章,各种技术的都有。你完全可以写成下面的链式风格方法会最先被执行同样,为了便于理解,我会借用流里面经常用到的水流进行类比。该子类的命名是有规律可言的。

现在网上已经有大量的源码分析文章,各种技术的都有。但我觉得很多文章对初学者并不友好,让人读起来云里雾里的,比源码还源码。究其原因,是根本没有从学习者的角度去分析。在自己完成了源码阅读之后,却忘记了自己是如何一步步提出问题,进而走到这里的。

所以,我想在本篇及以后的文章中,花更多的精力去进行源码的分析,争取用浅显易懂的语言,用适合的逻辑去组织内容。这样不至于陷入源码里,导致文章难懂。尽量让更多的人愿意去读源码。

阅读本文,你需要对 RxJava2 的一些基本使用有所了解,不过不用太深。这里推荐下Season_zlc的给初学者的RxJava2.0教程(一) ,比较浅显易懂。

提到 RxJava,你第一个想到的词是什么?

“异步”。

RxJava 在 GitHub 上的官网主页也说了,“RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.”(RxJava是一个使用可观测序列来组建异步、基于事件的程序的库,它是 Reactive Extensions 在Java虚拟机上的一个实现)。它的优点嘛,用扔物线凯哥的话讲,就是“简洁”,并且“随着程序逻辑变得越来越复杂,它依然能够保持简洁”。

这里要注意一点,虽然对大多数人来讲,更多的是使用 RxJava 来配合 Retrofit、OkHttp 进行网络请求框架的封装及数据的异步处理,但是,RxJava和网络请求本质上没有半毛钱的关系。它的本质,官网已经说的很明白了,就是“异步”。

RxJava 基于观察者模式实现,基于事件流进行链式调用。

首先,我们需要添加必要的依赖,这里以最新的2.2.8版本为例:

    implementation "io.reactivex.rxjava2:rxjava:2.2.8"

当然,对于 Android 项目来讲,我们一般还需要添加一个补充库:

    implementation "io.reactivex.rxjava2:rxandroid:2.1.0"

这个库其实就是提供了 Android 相关的主线程的支持。

然后写个简单的代码,就可以开始我们的源码分析啦。

        // 上游 observable
        Observable observable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                Log.d(TAG, "subscribe: ");
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        });

        // 下游 observer
        Observer observer = new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                // onSubscribe 方法会最先被执行
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: ");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        };

        // 将上游和下游进行关联
        observable.subscribe(observer);

为便于理解,我故意将可以链式调用的代码,拆成了三部分。你完全可以写成下面的链式风格:

 Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                Log.d(TAG, "subscribe: ");
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        }).subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                // onSubscribe 方法会最先被执行
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: ");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });

同样,为了便于理解,我会借用i/o流里面经常用到的水流进行类比。将被观察者 observable 称为上游(upstream),将观察者 observer 称为下游(downstream)。读源码其实也能看出,作者本身也正是这么类比的。

通过将整个过程拆分成三个步骤,能更清晰的理清逻辑。我们需要做的,本质上就是创建一个上游和一个下游,最终通过上游对象的subscribe方法将二者关联起来:

  1. 创建一个 Observable 的实现类

  2. 创建一个 Observer 的实现类

  3. 将二者通过 Observable 的 subscribe(…) 方法将二者进行关联

明白了这三点,以后我们就不会被各种实现类搞的眼花缭乱。

这三个步骤,里面的核心是第三部,也就是订阅过程,毕竟,这属于一个动作,而我们进行源码分析的时候,往往就是从动作开始的。这时候,我们Ctrl/Command + 鼠标左键,进入该方法看看,里面做了下什么。

    public final void subscribe(Observer<");super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            // RxJavaPlugins是个钩子函数,用来在代码的执行前后插入进行一些操作
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            // 关键点是这行代码
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can"t call onError because no way to know if a Disposable has been set or not
            // can"t call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can"t throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

这里将this(上游Observable类型)的和下游observer作为参数传给了 RxJavaPlugins 的 onSubscribe(…)方法,并返回一个Observer,同时,将原来的observer指向这个返回值,那么我们看看这个函数中到底进行了什么操作:

    //  RxJavaPlugins.java
    public static  Observer<");super T> onSubscribe(@NonNull Observable source, @NonNull Observer<");super T> observer) {
        BiFunction<");super Observable, ");super Observer, ");        if (f != null) {
            return apply(f, source, observer);
        }
        return observer;
    }

这里判断onObservableSubscribe是否为 null,不为 null 则调用其 apply(…) 方法。若为 null ,则直接返回原来的observer。而该变量需要通过RxJavaPlugin的setOnSingleSubscribe(...)方法来指定的,显然,我们并没有指定,所以忽略不管(后面遇到类似问题,基本也都可以忽略)。

回到之前的订阅流程,就可以简化为下面这样:

    public final void subscribe(Observer<");super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            ...
            // 调用到具体实现子类的 subscribeActual(observer) 方法
            subscribeActual(observer);
        } catch (
            ...
        }
    }

从上面代码可以看出,订阅过程,即调用Observable的subscribe(...)的过程,其实就是直接调用了其实现类的subscribeActual(observer)方法(该方法在 Observable 中是个抽象方法)。以后我们遇到这个方法,就直接去 Observable 的实现类中找即可,就不会乱了。

一些熟悉RxJava的朋友可能会说,有时候我们通过subscribe(...)订阅的并不是Observer对象,而是consumer对象,有各种重载。如下:

当你传入的是Consumer的时候,不管你传递了几个参数,最终都会代用到以下方法,那些你没传递的 onError或者 onComplete 回调等等,会自动使用默认创建的值。

    public final Disposable subscribe(Consumer<");super T> onNext, Consumer<");super Throwable> onError,
            Action onComplete, Consumer<");super Disposable> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        // 最终都会封装成一个 LambdaObserver,并作为参数传入subscribe(...)方法中
        LambdaObserver ls = new LambdaObserver(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }

可以看出,这里最终还是将这些 Consumer 对象包装在了一个 LambdaObserver 类型的变量中,然后又调用了subscribe(...)方法,将其作为变量传入,之后的分析,就跟上面是一样的了。

订阅方法讲完了,我们也知道最终调用到了 Observable 的实现类的subscribeActual(...)方法。那接下来肯定就是要弄懂在这个方中做了什么事。我们例子中是使用Observable.create(...)方法创建的 observable:

        // 上游 observable
        Observable observable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                Log.d(TAG, "subscribe: ");
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        });

其中,Observable.create(...)方法的实现是这样的:

    public static <T> Observable<T> create(ObservableOnSubscribe<Tsource) {
        ObjectHelper.requireNonNull(source"source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

我们传进去了一个实现了ObservableOnSubscribe接口的匿名内部类,该接口类也很简单,就定义了一个void subscribe(@NonNull ObservableEmitter emitter) throws Exception抽象方法。

然后我们将传进来的source(刚刚提到的匿名内部类ObservableOnSubscribe)封装进一个ObservableCreate对象中,又传进了RxJavaPlugins.onAssembly(...)中,这个RxJavaPlugins类刚才我们说过,其实就是一个hook类,暂时直接忽略,一般就是直接把传进来的参数返回了(不放心的话可以自己点进去,以后遇到该方法不再赘述)。

也就是说Observable.create(...)方法最终创建了一个ObservableCreate对象。注意,该对象是Observable抽象类的具体实现类。

特别注意!
特别注意!
特别注意!

重要事情说三遍。我们这里通过create(...)方法创建的Observable的具体实现子类是ObservableCreate。该子类的命名是有规律可言的。我在分析源码的时候有时候就想,这么多看起来名字都一样的类,RxJava的开发者本人不会懵逼吗?作为一个用户量这么大的库,肯定各种都有讲究,肯定有贵了。嗯。规律就是生成的子类的命名方法为“Observable+创建该类的方法名”,即:在创建该类的方法名称前面加上个Observable,以此来作为新的类

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/7367.html

相关文章

  • 「码个蛋」2017年200篇精选干货集合

    摘要:让你收获满满码个蛋从年月日推送第篇文章一年过去了已累积推文近篇文章,本文为年度精选,共计篇,按照类别整理便于读者主题阅读。本篇文章是今年的最后一篇技术文章,为了让大家在家也能好好学习,特此花了几个小时整理了这些文章。 showImg(https://segmentfault.com/img/remote/1460000013241596); 让你收获满满! 码个蛋从2017年02月20...

    wangtdgoodluck 评论0 收藏0
  • Rxjava2.x 源码解析(二): 线程切换

    摘要:这个上游是个相对概念,上游之上,还有上游,所以就不断回溯,最终调用到最开始指定的那个线程。虽然表面上看,确实是第一个指定的有效,但是千万别被欺骗了。文章较长,可以耐心点,反复看看。 这个上游是个相对概念,上游之上,还有上游,所以就不断回溯,最终调用到最开始指定的那个线程。 虽然表面上看,确实是第一个指定的有效,但是千万别被欺骗了。 好...

    lowett 评论0 收藏0
  • Rxjava2.x 源码解析(二): 线程切换

    摘要:这个上游是个相对概念,上游之上,还有上游,所以就不断回溯,最终调用到最开始指定的那个线程。虽然表面上看,确实是第一个指定的有效,但是千万别被欺骗了。文章较长,可以耐心点,反复看看。 这个上游是个相对概念,上游之上,还有上游,所以就不断回溯,最终调用到最开始指定的那个线程。 虽然表面上看,确实是第一个指定的有效,但是千万别被欺骗了。 好...

    LinkedME2016 评论0 收藏0

发表评论

0条评论

harryhappy

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<