RxJava2.0中just操作符用法和源码分析(二)

时间:2021-09-18 17:49:55

just基本使用

just是用来创建一个Observable来发射指定值,可以将任何对象转化为一个用来被发射的ObservableSource数据。这个方法与fromArray相似,除了from会将数组或者Iterable中的元素逐个取出然后在逐个依次发射,而just会直接将数组或者Iterable对象作为单个数据来发射。
RxJava2.0中just操作符用法和源码分析(二)

实例代码:

Integer[] items = { 0, 1, 2, 3, 4, 5 };
Observable.just(items).subscribe(new Consumer<Integer[]>() {
@Override
public void accept(Integer[] integers) throws Exception {
for (int i : integers) {
println("accept : onNext : " + i + "\n");
}
}
});

just接收一个用于发射的数据,我们查看源码发现just有许多重载的方法:

just(T item)

just(T item1, T item2)

just(T item1, T item2, T item3)

just(T item1, T item2, T item3, T item4)

just(T item1, T item2, T item3, T item4, T item5)

just(T item1, T item2, T item3, T item4, T item5, T item6)

just(T item1, T item2, T item3, T item4, T item5, T item6, T item7)

just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8)

just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9)

just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)

上面不同的重载方法分别用来接收不同数量的item,然后分别发射这些item。我们在看just方法中的实现:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}

public static <T> Observable<T> just(T item1, T item2) {
ObjectHelper.requireNonNull(item1, "The first item is null");
ObjectHelper.requireNonNull(item2, "The second item is null");

return fromArray(item1, item2);
}

public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
} else
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}

当just中接收的是一个item时,直接调用RxJavaPlugins.onAssembly(new ObservableJust(item)),如果是多个item,那么会先去判断item的合法性,然后再去调用RxJavaPlugins.onAssembly(new ObservableFromArray(items)).我们来逐个分析,先看单个item的调用:

Observable#just(T item)

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}

首先通过参数item,生成一个ObservableJust的对象,该类比较简单如下:

ObservableJust

/**
* Represents a constant scalar value.
* @param <T> the value type
*/

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {

private final T value;
public ObservableJust(final T value) {
this.value = value;
}

@Override
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}

@Override
public T call() {
return value;
}
}

将上面生成的ObservableJust类对象,作为onAssembly方法参数,进行装配:

RxJavaPlugins#onAssembly

@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}

到这里,我们知道会返回该Observable对象,而这个对象具体实现类就是ObservableJust对象。
完成了Observable初始化后,我们开始订阅以个观察者对象。在这里我选择了Consumer对象。我们来看看这个类:

public interface Consumer<T> {
/**
* Consume the given value.
* @param t the value
* @throws Exception on error
*/

void accept(@NonNull T t) throws Exception;
}

这个类十分简单,是标准的函数式接口,里面只有以个accept方法。现在我们重点来看看Observable中的subscribe方法。这里面同样对该方法进行了重载:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete) {
return subscribe(onNext, onError, onComplete, Functions.emptyConsumer());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

subscribe(ls);

return ls;
}

当你使用Consumer作为参数时,最多可以接收4个参数:
Consumer

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);

NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}

这里主要是执行了subscribeActual方法,而这个方法在Observable中是个抽象的方法,前面我们已经知道Observable的具体实现类是ObservableJust类,所以具体执行的也是这个类中的方法:

ObservableJust#subscribeActual

@Override
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}

方法里面首先创建了一个ScalarDisposable对象,然后将它作为参数执行传入的Observer的onSubscribe方法。
而我们又知道Observer对象就是刚才创建的LambdaObserver对象。

LambdaObserver#onSubscribe

public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

......

public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Disposable> onSubscribe) {
super();
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onSubscribe = onSubscribe;
}

@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.setOnce(this, s)) {
try {
onSubscribe.accept(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
onError(ex);
}
}
}

......
}

从这里可以知道onSubscribe方法中,其实是调用的onSubscribe实例中的accept方法。而这个onSubscribe又是在LambdaObserver实例化时,传入的第四个参数。这里将ObservableJust中subscribeActual方法里面的创建的ScalarDisposable对象,通过该方法回调传出来,用来观察ObservableJust的状态是否被断开。
接下来将会执行ScalarDisposable中的run方法:

public static final class ScalarDisposable<T> extends AtomicInteger implements QueueDisposable<T>, Runnable {

private static final long serialVersionUID = 3880992722410194083L;

final Observer<? super T> observer;

final T value;

static final int START = 0;
static final int FUSED = 1;
static final int ON_NEXT = 2;
static final int ON_COMPLETE = 3;

public ScalarDisposable(Observer<? super T> observer, T value) {
this.observer = observer;
this.value = value;
}

.....

@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
}

上面的分析我们知道这个observer其实就是LambdaObserver的实例化对象。通过它来执行LambdaObserver中的onNext方法,当完成发送时再去执行observer.onComplete()

LambdaObserver#onNext

@Override
public void onNext(T t) {
if (!isDisposed()) {
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
get().dispose();
onError(e);
}
}
}

@Override
public void onError(Throwable t) {
if (!isDisposed()) {
lazySet(DisposableHelper.DISPOSED);
try {
onError.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
}
}

@Override
public void onComplete() {
if (!isDisposed()) {
lazySet(DisposableHelper.DISPOSED);
try {
onComplete.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
}
}

而这里的onNext对象就是传入的第一个参数Consumer对象。于是第一个参数的Consumer的回调参数得到了执行。如果出现异常则会调用onError方法,该方法里面通过onError(第二个参数)的回调方法accept将错误信息传递出来。当执行完毕后就会调用onComplete方法,然后通过onComplete(第三个参数Action)回调run方法。到这里就完成了对just操作符的分析。

下面列出just的简单demo:

Observable.just(1,2,3).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
println("accept : onNext : " + integer + "\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
println("accept : throwable : " + throwable.getMessage() + "\n");
}
}, new Action() {
@Override
public void run() throws Exception {
println("accept : run" + "\n");
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
println("accept : disposable : " + disposable.isDisposed() + "\n");
}
});


执行结果:

accept : disposable : false

accept : onNext : 1

accept : onNext : 2

accept : onNext : 3

accept : run