Rxjava2源码浅析(一)

时间:2021-12-02 17:43:33

面试的时候被问道各种框架的原理架构,也是很尴尬,自以为写的代码不少,用过的框架也不少,深入的去研究源码的还真是不多,也是给自己敲了一个警钟,今天就来尝试剖析一下Rxjava2的源码,水平有限,就先看一下基础的用法相关,一些难度更高的操作符就慢慢来分析吧。
就按照平时使用的顺序来分析:

一、初始化Observerble

基本使用实例:

Observable<String> observable= Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("aaa");
}
});

先看一下内部的参数 ObservableOnSubscribe<>() 。

public interface ObservableOnSubscribe<T> {
void subscribe(ObservableEmitter<T> e) throws Exception;
}

就是一个接口,这里用的就是它的一个匿名实现类。而接口内部的方法中我们看到ObservableEmitter<> 是一个Rxjava2新推出的类,俗称发射器。

public interface ObservableEmitter<T> extends Emitter<T> {

/**
* Sets a Disposable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
* @param d the disposable, null is allowed
*/

void setDisposable(Disposable d);

/**
* Sets a Cancellable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
* @param c the cancellable resource, null is allowed
*/

void setCancellable(Cancellable c);

/**
* Returns true if the downstream disposed the sequence.
* @return true if the downstream disposed the sequence
*/

boolean isDisposed();

/**
* Ensures that calls to onNext, onError and onComplete are properly serialized.
* @return the serialized ObservableEmitter
*/

ObservableEmitter<T> serialize();
}

这里面几个回调方法的作用注释也说的很清楚了就不多说了。
它继承自Emitter

public interface Emitter<T> {

/**
* Signal a normal value.
* @param value the value to signal, not null
*/

void onNext(@NonNull T value);

/**
* Signal a Throwable exception.
* @param error the Throwable to signal, not null
*/

void onError(@NonNull Throwable error);

/**
* Signal a completion.
*/

void onComplete();
}

可以看到,这里面就是我们比较熟悉的next、complete、error三个回调方法了。其实这个create方法内部的参数就是两个接口的回调,理解就行了,然后看一下create方法。

 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

requireNonNull是很好理解的,看名字也能猜测出是测试传进来的ObservableOnSubscribe是否为空

  public static <T> T requireNonNull(T object, String message) {
if (object == null) {
throw new NullPointerException(message);
}
return object;
}

而源码也验证了我们的想法。关键是后面一句,先看一下具体的方法实现。

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}

一句一句来分析:

public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/

@NonNull
R apply(@NonNull T t) throws Exception;
}

这里的Function也是一个接口,作用也很明显,将T类型的数据转化成R类型数据。那是我们在使用到

        observable.map(new Function<String, Object>() {
@Override
public Object apply(@NonNull String s) throws Exception {
return null;
}
})

类似这种类型转换的语句时候才会用到,这里我们先不管它,一开始是默认为null的,所以这个方法最后就会return source;就是将括号中的new ObservableCreate(source)原样返回。这个ObservableCreate又是什么呢?

public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}

源码比较长我们就之看一下它的构造函数就可以了,目前只需要知道这是一个Observerble的子类就可以了,至于Observerble这个类,等到大概摸清楚了事件流程再回头来分析。所以到现在我们的第一步初始化就算是分析完了流程。

二、初始化一个Observer

用法示例:

Observer<String> observer=new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
subscription=d;
}

@Override
public void onNext(String value) {
LogUtil.log(TAG," "+value);
}

@Override
public void onError(Throwable e) {
}

@Override
public void onComplete() {
LogUtil.log(TAG,"complete");
}
};

这个分析就要简单很多了,Observer只是一个简单的接口,这里也只是具体实现了一下接口回调。

public interface Observer<T> {

/**
* Provides the Observer with the means of cancelling (disposing) the
* connection (channel) with the Observable in both
* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
* @param d the Disposable instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/

void onSubscribe(Disposable d);

/**
* Provides the Observer with a new item to observe.
* <p>
* The {@link Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/

void onNext(T t);

/**
* Notifies the Observer that the {@link Observable} has experienced an error condition.
* <p>
* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onComplete}.
*
* @param e
* the exception encountered by the Observable
*/

void onError(Throwable e);

/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/

void onComplete();

}

不过这里和Rxjava1也是有些区别的,多了一个onSubscribe 注释也说的很清楚,用于随时取消订阅。
第二步很轻松,下面看一下第三步

三、建立订阅关系

用法示例:

observable.subscribe(observer);

这里我们就只分析最简单的一种,看一下源码:

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);

NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}

第一句还是一样,判断是否为空,平时自己写代码也要像这样注意代码的健壮性。
重点就是这三句了。

 observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

subscribeActual(observer);

一句一句来看:

 public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
if (f != null) {
return apply(f, source, observer);
}
return observer;
}

在这个onsubscribe中是不是觉得有些眼熟?就跟刚刚的onAssenmbly几乎一样,由于我们没有其它的功能,所以这里onObservableSubscribe也是null,也是返回原值,下面的requireNonNull我们也见过了,又验证一遍是否为空,因为如果我们加入了Function函数,上面就不会返回原来的observer了,所以还要再验证一遍。
于是就到了最后一句

protected abstract void subscribeActual(Observer<? super T> observer);

???
怎么是个abstract方法?那么它是在哪实现的呢?
回想看我们的observable初始化过程。哪里出的问题呢?就是我们一开始没有分析的ObservableCreate,我们在初始化的时候就将一个ObservableCreate类向上转型赋值给了Observerble,所以方法的具体实现也就在ObservableCreate里了。
继续跟进。果不其然:

@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);

try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

还是一句一句来看

CreateEmitter<T> parent = new CreateEmitter<T>(observer);

又是一个新的类

    static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {


private static final long serialVersionUID = -3434801548987643227L;

final Observer<? super T> observer;

CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}

@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}

@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}

@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}

@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}

@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}

@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}

@Override
public void dispose() {
DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}

所以我们的前两句

CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);

所以我们的前两局就是回调了onSubscribe接口,从而将这个CreateEmitter类型转型成Disposable输出了。而CreateEmitter的初始化参数又是observer本身,所以大体上可以看成回调了另一个格式的自己。。。然后一般可用于自杀(取消订阅)。。。
然后就来到的最后一句

source.subscribe(parent);

这里的source就是

new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("test");
}
}

刚刚我们初始化observable传入的。这个parent->这里的参数e。于是就这样完成了Observerble和Observer的绑定,也就能实现接口回调了。

没有任何其它功能,只是走了一边最基本流程的Rxjava源码,后面还会继续更新的。