资讯专栏INFORMATION COLUMN

RxJava中操作符到底做了什么?

sunny5541 / 1767人阅读

摘要:今年彻底火了一把,其中最牛逼之处就是操作符了,以前只知道怎么用,这几天看了看源码,大致的弄清楚了操作符的工作过程,今天分享给大家。如果有什么不对地方,请大家多多指教。今天我们已为例,看代码一个很简单的小例子,用过滤操作符找出大于等于的数字。

    RxJava今年彻底火了一把,其中最牛逼之处就是操作符了,以前只知道怎么用,这几天看了看源码,大致的弄清楚了操作符的工作过程,今天分享给大家。如果有什么不对地方,请大家多多指教。

    今天我们已filter为例,看代码:

</>复制代码

  1. Integer[] datas={1,2,3,4,5,6,7,8,9,10};
  2. Observable.from(datas)
  3. .filter(new Func1() {
  4. @Override
  5. public Boolean call(Integer integer) {
  6. return integer>=5;
  7. }
  8. })
  9. .subscribe(new Action1() {
  10. @Override
  11. public void call(Integer integer) {
  12. mText.append(integer.toString()+",");
  13. }
  14. });

     一个很简单的小例子,用过滤操作符 filter 找出大于等于5的数字。我们点进去看看源码中filter做了什么

</>复制代码

  1. public final Observable filter(Func1 predicate) {
  2. return create(new OnSubscribeFilter(this, predicate));
  3. }

    调用了create()方法,等等我们什么时候是不是也用过create() 方法,我们在创建Observable时候也用过create()方法,原来创建了一个新的Observable返回出去了,那岂不是说我们的订阅者其实订阅的是这个新的Observable,我们继续往下看create方法,create方法需要的参数是一个OnSubscribe对象,那我们可以确定
OnSubscribeFilter是OnSubscribe的一个实现类,我们点进去看看。

</>复制代码

  1. public final class OnSubscribeFilter implements OnSubscribe {
  2. final Observable source;
  3. final Func1 predicate;
  4. public OnSubscribeFilter(Observable source, Func1 predicate) {
  5. this.source = source;
  6. this.predicate = predicate;
  7. }

    果然不出我们所料,OnSubscribeFilter是OnSubscribe的实现类,我们看他的构造方法,传递了两个参数,第一个参数Observable对象,一个Func1,其中第一个参数就是我们我们自己创建的那个Observable,第二个参数使我们在外面写的Func1,然后保存了起来。我们都知道在subscribe()订阅的时候,OnSubscribe的call()方法。我们看看OnSubscribeFilter的call()方法都干了些什么

</>复制代码

  1. @Override
  2. public void call(final Subscriber child) {
  3. FilterSubscriber parent = new FilterSubscriber(child, predicate);
  4. child.add(parent);
  5. source.unsafeSubscribe(parent);
  6. }

</>复制代码

  1.     出现了一个FilterSubscriber,什么鬼玩意儿,我们看看他是什么鬼

</>复制代码

  1. static final class FilterSubscriber extends Subscriber {
  2. final Subscriber actual;
  3. final Func1 predicate;
  4. boolean done;
  5. public FilterSubscriber(Subscriber actual, Func1 predicate) {
  6. this.actual = actual;
  7. this.predicate = predicate;
  8. request(0);
  9. }
  10. @Override
  11. public void onNext(T t) {
  12. boolean result;
  13. try {
  14. result = predicate.call(t);
  15. } catch (Throwable ex) {
  16. Exceptions.throwIfFatal(ex);
  17. unsubscribe();
  18. onError(OnErrorThrowable.addValueAsLastCause(ex, t));
  19. return;
  20. }
  21. if (result) {
  22. actual.onNext(t);
  23. } else {
  24. request(1);
  25. }
  26. }
  27. @Override
  28. public void onError(Throwable e) {
  29. if (done) {
  30. RxJavaHooks.onError(e);
  31. return;
  32. }
  33. done = true;
  34. actual.onError(e);
  35. }
  36. @Override
  37. public void onCompleted() {
  38. if (done) {
  39. return;
  40. }
  41. actual.onCompleted();
  42. }
  43. @Override
  44. public void setProducer(Producer p) {
  45. super.setProducer(p);
  46. actual.setProducer(p);
  47. }
  48. }
  49. }

    一个Subscriber的子类,我们看他的构造方法,两个参数,一个Subscriber一个Func1,我们在创建对象时候Subscriber对象是我们真正的从外界传过来的观察者,Func1呢使我们创建OnSubscribeFilter时候传递进来的对象,也就是我们在外界定义的Func1。
    回过头来我们继续看OnSubscribeFilter的call方法。我们看到source.unsafeSubscribe(parent),source是我们原来外界的Observable,他订阅了FilterSubscriber对象。我们在他的onNext方法中看到他根据func1.call(t)的返回值来判断是否让我们外界的真正的观察者调用onNext方法。
    看到这里有没有恍然大悟,啥?我都不知道你在说啥,额,那我们整体的屡屡。

    我们外界的代码,在subscribe()时候,Subscriber并不是订阅了我们自己写的Observable,Subscriber订阅的是filter方法返回的那个新的Observable对象,所以订阅时候会调用OnSubscribeFilter的call方法,OnSubscribeFilter才是我们订阅的被观察者的onSubscribe对象,在OnSubscribeFilter的call()方法中,我们让我们包装的FilterSubscriber订阅我们原来的被观察者,也就是我们在外界生成的那个Observable。我们在外界的Observable的onSubscribe对象的call方法中得到的观察者是FilterSubscriber对象,我们调用的onNext会回调到FilterSubscriber的onNext方法中。在FilterSubscriber的onNext方法中我们根据我们传递的Func1来判断是否要回调真正的Subscriber的onNext方法,在为true的时候我们才回调我们外界的观察者的onNext方法,也就起到了过滤的作用。这就是Filter的整个的流程。

    我们来测试下我们的小结论:

</>复制代码

  1. Observable.create(new Observable.OnSubscribe() {
  2. @Override
  3. public void call(Subscriber subscriber) {
  4. Log.e("call:subscriber", "" + subscriber.getClass().getCanonicalName());
  5. subscriber.onNext(5);
  6. }
  7. }).filter(new Func1() {
  8. @Override
  9. public Boolean call(Integer integer) {
  10. return integer > 0;
  11. }
  12. }).subscribe(new Action1() {
  13. @Override
  14. public void call(Integer integer) {
  15. }
  16. });

不知道大家看明白没有,非常愿意和大家一起讨论,一起学习,欢迎留言

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/66244.html

相关文章

  • 扔物线:给 Android 开发者的 RxJava 详解

    摘要:观察者模式面向的需求是对象观察者对对象被观察者的某种变化高度敏感,需要在变化的一瞬间做出反应。规定,当不会再有新的发出时,需要触发方法作为标志。在事件处理过程中出异常时,会被触发,同时队列自动终止,不允许再有事件发出。 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava ,并且使...

    tianren124 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<