首先感谢 扔物线 哥哥给的配图,实在太赞了。
基本结构
我们先来看一段最基本的代码,分析这段代码在 RxJava 中是如何实现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
Observable.OnSubscribe onSubscriber1 = new Observable.OnSubscribe() { @Override public void call(Subscriber super String> subscriber) { subscriber.onNext("1"); subscriber.onCompleted(); } }; Subscriber subscriber1 = new Subscriber() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { } }; Observable.create(onSubscriber1) .subscribe(subscriber1); |
首先我们来看一下Observable.create的代码
1 2 3 4 5 6 7 |
public final static Observablecreate(OnSubscribe f) { return new Observable(hook.onCreate(f)); } protected Observable(OnSubscribe f) { this.onSubscribe = f; } |
直接就是调用了Observable的构造函数来创建一个新的Observable对象,这个对象我们暂时标记为observable1,以便后面追溯。
同时,会将我们传入的OnSubscribe对象onSubscribe1保存在observable1的onSubscribe属性中,这个属性在后面的上下文中很重要,大家留心一下。
接下来我们来看看subscribe方法。
1 2 3 4 5 6 7 8 9 10 |
public final Subscription subscribe(Subscriber super T> subscriber) { return Observable.subscribe(subscriber, this); } private static T> Subscription subscribe(Subscriber super T> subscriber, ObservableT> observable) { ... subscriber.onStart(); hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } |
可以看到,subscribe之后,就直接调用了observable1.onSubscribe.call方法,也就是我们代码中的onSubscribe1对象的call方法
,传入的参数就是我们代码中定义的subscriber1对象。call方法中所做的事情就是调用传入的subscriber1对象的onNext和onComplete方法。
这样就实现了观察者和被观察者之间的通讯,是不是很简单?
1 2 3 4 |
public void call(Subscriber super String> subscriber) { subscriber.onNext("1"); subscriber.onCompleted(); } |
lift
讲之前先上一个简单的lift流程图吧
lift方法是RxJava中实现自定义operator的关键,这里我们以最简单的map为例,来分析一下lift方法的工作原理,我们对上面的demo代码稍作修改
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
Observable.OnSubscribe onSubscriber1 = new Observable.OnSubscribe() { @Override public void call(Subscriber super String> subscriber) { subscriber.onNext("1"); subscriber.onCompleted(); } }; Subscriber subscriber1 = new Subscriber() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer i) { } }; Func1 transformer1 = new Func1() { @Override public Integer call(String s) { return Integer.parseInt(s); } }; Observable.create(onSubscriber1) .map(transformer1) .subscribe(subscriber1); |
和刚才不同的是我们在create之后调用了map方法,然后才调用subscribe方法。
map方法的代码如下:
1 2 3 |
public final Observablemap(Func1 super T, ? extends R> func) { return lift(new OperatorMap(func)); } |
一堆泛型参数是不是略晕啊,别急,我们慢慢来看。
首先来介绍一下Func这个接口。RxJava中有一系列Action+数字,Func+数字的接口,这些接口中都只有一个call方法,其中Action接口的call方法都没有返回值,
Func接口的call方法都有返回值,后面的那个数字表示call方法接受几个泛型类型的参数。
其实主要是因为Java中函数不是一等公民,所以只能用接口这么啰嗦的格式,还好我们可以使用lambda简化我们的代码。(羡慕函数式语言)
这里map方法接收的参数类型为Func1 super T, ? extends R> func
,表示func的call方法接收一个T类型的参数,返回一个R类型的返回值。
OperatorMap又是什么鬼呢?
1 2 3 |
public final class OperatorMapT, R> implements OperatorR, T> public interface OperatorR, T> extends Func1Subscriber super R>, Subscriber super T>> |
这里可以看到OperatorMap继承自Operator, 而Operator又继承自Func1接口,也就是说Operator接口的call方法会接收一个Subscriber类型的参数,
并且返回另外一个Subscriber类型的对象。Operator.call方法返回一个Subscriber对象,其实我们可以这么理解,每一个operator也是一个订阅者,
它返回的Subscriber对象正好用来订阅Observable发出来的消息。
有一点需要注意的是OperatorMap和Operator的泛型参数顺序刚好是相反的,为什么要这么做呢?其实很简单,因为Operator本身是对Observable发出的数据
进行转换的,所以经常会出现operator转换之后返回的数据类型变了,而OperatorMap这里刚好颠倒了一下顺序,就可以保证call方法返回的Subscriber类型
可以订阅Observable发出的数据。
OperatorMap的代码我们先不看,先来看一下lift方法中都做了些啥吧。
1 2 3 4 5 6 7 8 9 10 |
public final Observablelift(final Operator extends R, ? super T> operator) { return new Observable(new OnSubscribe() { @Override public void call(Subscriber super R> o) { Subscriber super T> st = hook.onLift(operator).call(o); st.onStart(); onSubscribe.call(st); } }); } |
lift方法会返回一个新创建的Observable对象,这里我们给这个Observable一个标识observable2。observable2的onSubscribe属性就是lift中new出来的这个
OnSubscribe对象。
对照demo中的代码,我们调用map之后,就调用了subscribe方法,也就是调用了这里的observable2的subscribe方法。
根据上面的介绍,调用subscribe之后,就会调用observable2.onSubscribe.call方法,call中首先做的事情就是调用OperatorMap的call方法
1 2 3 4 5 6 7 8 9 |
@Override public Subscriber super T> call(final Subscriber super R> o) { |