当前位置: 首页 > news >正文

RxJava操作符变换过程

要使用Rxjava首先要导入两个包,其中rxandroid是rxjava在android中的扩展

    implementation 'io.reactivex:rxandroid:1.2.1'implementation 'io.reactivex:rxjava:1.2.0'

我们在使用rxjava的操作符时都觉得很方便,但是rxjava是怎么实现操作符的转换呢,以下面的代码进行分析

String host = "https://baidu.com/";Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("message");subscriber.onCompleted();}}).map(new Func1<String, String>() {@Overridepublic String call(String s) {return host+s;}}).subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {ILog.LogDebug("subscriber onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriber onError is come in");}@Overridepublic void onNext(String s) {ILog.LogDebug(s);}});

上面的代码是链式调用,为了方便理解,我把上面的代码拆分成了下面样式

        String host = "https://baidu.com/";Observable<String> obs1 = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("message");subscriber.onCompleted();}});Observable<String> obs2 = obs1.map(new Func1<String, String>() {@Overridepublic String call(String s) {return host+s;}});obs2.subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {ILog.LogDebug("subscriberLast onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriberLast onError is come in");}@Overridepublic void onNext(String s) {ILog.LogDebug(s);}});

上面代码会打印,我们将一步一步分析,打印是怎么来的

https://www.baidu.com/message
subscriberLast onCompleted is come in

obs1是我们的原始Observable, obs2是我们变换过的Observable
首先从obs1的创建开始,就是Observable的创建过程,有不理解的可以先看RXJava的创建订阅过程
osb1的创建过程create函数需要一个OnSubscribe对象,为了方便理解,就暂不拆开来写,代码虽然用的是匿名对象,我们暂且叫它onSubscribe1

       Observable<String> obs1 = Observable.create(new Observable.OnSubscribe<String>() { //onSubscribe1@Overridepublic void call(Subscriber<? super String> subscriber) { subscriber.onNext("observable call onNext0");subscriber.onStart();subscriber.onNext("observable call onNext");subscriber.onCompleted();subscriber.onNext("observable call onNext1");}});

obs1创建成功后调用了map方法,map方法又返回了一个Observable,就是我们的obs2,同理,map方法的参数我们叫它func1

        Observable<String> obs2 = obs1.map(new Func1<String, String>() { //func1@Overridepublic String call(String s) {return host+s;}});

随后我们又使用了osb2进行了订阅,同理subscribe方法的参数我们叫它subscriberLast

         obs2.subscribe(new Subscriber<String>() { //subscriberLast@Overridepublic void onCompleted() {ILog.LogDebug("subscriber onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriber onError is come in");}@Overridepublic void onNext(String s) {ILog.LogDebug("subscriber onNext is come in s = "+s);}});

于是有了下面的关系,到这里还都很好理解
在这里插入图片描述然后我们来分析obs1.map调用了map方法,rxjava做了什么

public class Observable<T> {
....public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {return create(new OnSubscribeMap<T, R>(this, func));}public static <T> Observable<T> create(OnSubscribe<T> f) {return new Observable<T>(RxJavaHooks.onCreate(f));}
....
}

map方法又调用了create方法,create方法的调用我们已经在RxJava的调用过程中讲过,传入的参数是一个OnSubscribe对象,OnSubscribeMap实现了OnSubscribe接口,我们把new出来的OnSubscribeMap对象暂时叫做onSubscribeMap1构造方法传入的参数是obs1func1, 那我们再来看OnSubscribeMap类

public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {final Observable<T> source;final Func1<? super T, ? extends R> transformer;public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {this.source = source;this.transformer = transformer;}@Overridepublic void call(final Subscriber<? super R> o) {MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);o.add(parent);source.unsafeSubscribe(parent);}static final class MapSubscriber<T, R> extends Subscriber<T> {final Subscriber<? super R> actual;final Func1<? super T, ? extends R> mapper;boolean done;public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {this.actual = actual;this.mapper = mapper;}@Overridepublic void onNext(T t) {R result;try {result = mapper.call(t);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);unsubscribe();onError(OnErrorThrowable.addValueAsLastCause(ex, t));return;}actual.onNext(result);}@Overridepublic void onError(Throwable e) {if (done) {RxJavaHooks.onError(e);return;}done = true;actual.onError(e);}@Overridepublic void onCompleted() {if (done) {return;}actual.onCompleted();}@Overridepublic void setProducer(Producer p) {actual.setProducer(p);}}}

我们知道在Observable创建好后,调用了subscribe方法就可以进行订阅了,最后调用的也是Observable创建时传入的OnSubscribe对象的call方法,以obs1的创建举例,也就是我们这里的onSubscribe1,不懂得去看RxJava的调用过程
因为我们最后执行subscribe订阅方法的是obs2那么也就会调用obs2的OnSubscribe对象,那么obs2的OnSubscribe对象是谁呢,就是onSubscribeMap1那么执行完订阅就会调用onSubscribeMap1的call方法

    @Overridepublic void call(final Subscriber<? super R> o) {MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);o.add(parent);source.unsafeSubscribe(parent);}

这里的source是OnSubscribeMap构造方法调用时初始化的也就是obs1,transformer 是我们的func1

    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {this.source = source;this.transformer = transformer;}

现在又有了下面的关系
在这里插入图片描述在onSubscribeMap1的call方法中,一共有三行代码,这三行代码很重要,我们一行一行分析

        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);

实例化一个MapSubscriber对象parent ,MapSubscriber继承了Subscriber,所以MapSubscriber也是一个观察者;parent 持有了o,o是也就是subscriberLast,还持有了transformer,也就是func1。这里一会要详细分析

        o.add(parent);

o是一个Subscriber,也是subscriberLast,上面的代码直接把parent添加到了subscriberLast的SubscriptionList列表。这里一会还要说一下。在看下一条代码

        source.unsafeSubscribe(parent);

我们知道source也就是我们的obs1obs1的订阅操作就是在这里发生的。至于为什么用unsafeSubscribe方法, 我们一会在分析

我们重新梳理一下,
1、obs2的订阅方法subscribe执行导致了obs2的onSubscribeMap1实例的call方法被执行;
2、onSubscribeMap1的call方法中又执行了obs1的订阅;obs1的观察者就是parent;
3、obs1的订阅必然会导致obs1 的onSubscribe实例onSubscribe1的call方法被执行。
4、在onSubscribe1的call方法中我们又调用了

               subscriber.onNext("message");subscriber.onCompleted();

这里的subscriber也就是parent,必然会调用parent的next方法并传入message,和onCompleted。
5、继续分析parent

 static final class MapSubscriber<T, R> extends Subscriber<T> {final Subscriber<? super R> actual;final Func1<? super T, ? extends R> mapper;boolean done;public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {this.actual = actual;this.mapper = mapper;}@Overridepublic void onNext(T t) {R result;try {result = mapper.call(t);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);unsubscribe();onError(OnErrorThrowable.addValueAsLastCause(ex, t));return;}actual.onNext(result);}@Overridepublic void onError(Throwable e) {if (done) {RxJavaHooks.onError(e);return;}done = true;actual.onError(e);}@Overridepublic void onCompleted() {if (done) {return;}actual.onCompleted();}@Overridepublic void setProducer(Producer p) {actual.setProducer(p);}}

parent是MapSubscriber,MapSubscriber继承了Subscriber,MapSubscriber的构造方法需要两个参数Subscriber和Func1,通过之前的分析,知道actual就是subscriberLast,mapper就是fun1

        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {this.actual = actual;this.mapper = mapper;}

用一张图来说明下
在这里插入图片描述

 static final class MapSubscriber<T, R> extends Subscriber<T> {.....@Overridepublic void onNext(T t) {R result;try {result = mapper.call(t);  //这里调用,mapper就是fun1,的call方法,这里的t就是message,result就是转换后的字符串} catch (Throwable ex) {Exceptions.throwIfFatal(ex);unsubscribe();onError(OnErrorThrowable.addValueAsLastCause(ex, t));return;}actual.onNext(result);}.....
}

在parent的onnext方法中调用了func1的call方法,还记得我们在func1的call方法中写的什么,没错就是转换字符串,call方法的返回值就是转换后的字符串

        final String host = "https://www.baidu.com/";Observable<String> obs2 = obs1.map(new Func1<String, String>() {@Overridepublic String call(String s) {return host+s;  //func1的call方法 进行了字符串转换,这里的s就是message}});

继续调用了

actual.onNext(result);

我们知道actual就是subscriberLast,所以会调用subscriberLast的onNext方法,

       obs2.subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {ILog.LogDebug("subscriberLast onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriber onError is come in");}@Overridepublic void onNext(String s) { //subscriberLast的onNext方法ILog.LogDebug(s);//最后会打印https://www.baidu.com/message}@Overridepublic void onStart() {super.onStart();}});

在obs1中我们还调用了onCompleted方法,先调用了actual.onCompleted();
actual也就是subscriberLast实例。

        @Overridepublic void onCompleted() {if (done) {return;}actual.onCompleted();}

在这添加一张图方便更好的理解
在这里插入图片描述简单的说就是先调用obs1的subscriber的onNext()方法,在onNext()方法中调用func1的call方法,处理数据源数据,然后再把处理过的数据源发射给obs2的subscriber

还记不记得上面的第二条代码 o.add(parent);为什么要把parent添加到o中呢,o也就是subscriberLast。

    @Overridepublic void call(final Subscriber<? super R> o) {MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);o.add(parent);//第二条代码source.unsafeSubscribe(parent);//第三条代码}

subscriberLast的add方法最终会把parent添加到subscriberLast的SubscriptionList中,关于Subscriber请看RxJava中的Subscriber。再执行SubscriptionList的解绑方法unsubscribe会把subscriptions中的Subscription一并解绑,也就是会把parentobs1的绑定关系解除。
那上面第三条代码为什么调用的是unsafeSubscribe方法呢,记得我们之前分析RxJava的订阅过程中看到的是最后包装了一个SafeSubscriber,再SafeSubscriber中会进行一些多线程的处理操作。

        /** See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls* to user code from within an Observer"*/// if not already wrappedif (!(subscriber instanceof SafeSubscriber)) {// assign to `observer` so we return the protected versionsubscriber = new SafeSubscriber<T>(subscriber);}

observable中unsafeSubscribe方法也很简单,也没并由对传入的subscriber进行包装而是直接调用。

public class Observable<T> {
....
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {try {// new Subscriber so onStart itsubscriber.onStart();// allow the hook to intercept and/or decorateRxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);return RxJavaHooks.onObservableReturn(subscriber);} catch (Throwable e) {// special handling for certain Throwable/Error/Exception typesExceptions.throwIfFatal(e);// if an unhandled error occurs executing the onSubscribe we will propagate ittry {subscriber.onError(RxJavaHooks.onObservableError(e));} catch (Throwable e2) {Exceptions.throwIfFatal(e2);// if this happens it means the onError itself failed (perhaps an invalid function implementation)// so we are unable to propagate the error correctly and will just throwRuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);// TODO could the hook be the cause of the error in the on error handling.RxJavaHooks.onObservableError(r);// TODO why aren't we throwing the hook's return value.throw r; // NOPMD}return Subscriptions.unsubscribed();}}
....

我们回过来再看MapSubscriber中其实也已经进行了简单的处理工作,所以也就不需要使用SafeSubscriber了。
至此RxJava操作符变换过程就分析完了,欢迎大家补充和纠正。

http://www.lryc.cn/news/38429.html

相关文章:

  • 开放平台订单接口
  • CDN相关知识点
  • 【论文阅读】注意力机制与二维 TSP 问题
  • [深入理解SSD系列 闪存实战2.1.7] NAND FLASH基本编程(写)操作及原理_NAND FLASH Program Operation 源码实现
  • PMP项目管理项目资源管理
  • 程序的编译和链接
  • Win11的两个实用技巧系列之无法联网怎么办、耳机没声音的多种解决办法
  • 【微信小程序】-- 自定义组件 - 数据监听器 - 案例 (三十五)
  • Linux - 第7节 - 进程间通信
  • # 数据完整性算法在shell及python中的实践
  • QEMU启动x86-Linux内核
  • C/C++每日一练(20230311)
  • 哪个牌子的洗地机耐用?耐用的洗地机推荐
  • 搭建一个中心化的定时服务
  • 【CSS】快速入门笔记
  • 第161篇 笔记-去中心化的含义
  • 「计算机组成原理」数据的表示和运算(二)
  • 建立自己的博客
  • Docker 安装mysql Mac 环境下
  • 《C++代码分析》第三回:类成员函数覆盖父类函数的调用(分析this指针的变化)
  • Altium designer--软件简介及安装教程(Altium designer16)
  • Windows系统下基于开源软件的多物理场仿真
  • 【STL】list剖析及模拟实现
  • Go打包附件内容到执行文件
  • Spring的配置属性
  • 132.《render-props, Hoc,自定义hooks 详解》
  • 通过Session共享数据验证码进行用户登录
  • C++STL详解(六)——stack和queue
  • javaEE 初阶 — CSS 的 基本语法 与 引入方式
  • QEMU启动ARM32 Linux内核