rxjava第一篇 rxjava基本用法

时间:2023-01-28 17:49:57

1.rxjava基本实现:

配置:

androidStudio中配置gradle

dependencies {
...
compile 'io.reactivex:rxjava:1.2.0'
compile'io.reactivex:rxjava:1.2.1'
}

rxjava使用分为3步:


(1)创建观察者observer
(2)创建被观察者observable
(3)订阅subscribe

 //创建观察者1
Subscriber subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted");
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}

@Override
public void onNext(String s) {
Log.d(TAG, "onNext" + s);
}

};
//创建观察者2
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted");
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}

@Override
public void onNext(String s) {
Log.d(TAG, "onNext" + s);
}
};
//创建被观察者1
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("变形金刚");
subscriber.onNext("X战警");
subscriber.onCompleted();

}
});

String[] s = {"sss", "sss"};
//创建被观察者2
Observable observable1 = Observable.from(s);
//创建被观察者3
Observable observable2 = Observable.just("sss", "sss");
//订阅
Log.d(TAG, "\n--------------------------\n" +
"订阅1\n");
observable.subscribe(subscriber);
Log.d(TAG, "\n--------------------------\n" +
"订阅2\n");
observable.subscribe(observer);
Log.d(TAG, "\n--------------------------\n" +
"订阅3\n");
observable1.subscribe(observer);
Log.d(TAG, "\n--------------------------\n" +
"订阅4\n");
observable2.subscribe(observer);


2.rxjava的不完整定义回调:

使用Action()方法

其中Action后的数字代表回调的参数类型数量,以上用代码实现用不完整定义回调可以这样是写:
 //不完整回调
Action1<String> nextAction = new Action1<String>() {
@Override
public void call(String s) {
Log.d(TAG, "onNext" + s);
}
};
Action1<Throwable> errorAction = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.d(TAG, "onError");
}
};
Action0 onCompleteAction = new Action0() {
@Override
public void call() {
Log.d(TAG, "onComplete");
}
};
Log.d(TAG, "\n--------------------------\n" +
"不完整回调示例\n");
observable.subscribe(nextAction, errorAction, onCompleteAction);

3.rxjava的subject

Subject可以看成是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。

Subject的种类

针对不同的场景一共有四种类型的Subject

AsyncSubject

一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。(如果原始Observable没有发射任何值,AsyncObject也不发射任何值)它会把这最后一个值发射给任何后续的观察者。 

BehaviorSubject

当观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable的数据。

ReplaySubject

ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。也有其它版本的ReplaySubject,在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据(原始Observable发射的)。

如果你把ReplaySubject当作一个观察者使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反Observable协议,给Subject的结果增加了不确定性。

PublishSubject

PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据(除非你可以阻止它发生),因此这里有一个风险:在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。如果要确保来自原始Observable的所有数据都被分发,你需要这样做:或者使用Create创建那个Observable以便手动给它引入"冷"Observable的行为(当所有观察者都已经订阅时才开始发射数据),或者改用ReplaySubject。 

 Log.d(TAG, "\n--------------------------\n" +
"asyncSubject示例");
AsyncSubject asyncSubject=AsyncSubject.create();
asyncSubject.onNext("asyncSubject 大王来巡山1");
asyncSubject.onNext("asyncSubject 大王来巡山2");
asyncSubject.onNext("asyncSubject 大王来巡山3");
asyncSubject.onCompleted();
asyncSubject.subscribe(observer);

Log.d(TAG, "\n--------------------------\n" +
"replaySubject示例\n");
ReplaySubject replaySubject=ReplaySubject.create();
replaySubject.onNext("replaySubject 大王来巡山1");
replaySubject.onNext("replaySubject 大王来巡山2");
replaySubject.onNext("replaySubject 大王来巡山3");
replaySubject.onCompleted();
replaySubject.subscribe(observer);
Log.d(TAG, "\n--------------------------\n" +
"behaviorSubject示例\n");
BehaviorSubject behaviorSubject=BehaviorSubject.create();
behaviorSubject.onNext("behaviorSubject 大王来巡山1");
behaviorSubject.onNext("behaviorSubject 大王来巡山2");
behaviorSubject.onNext("behaviorSubject 大王来巡山3");
// behaviorSubject.onCompleted();
behaviorSubject.subscribe(observer);
Log.d(TAG, "\n--------------------------\n" +
"publishSubject示例\n");
PublishSubject publishSubject=PublishSubject.create();
publishSubject.subscribe(observer);
publishSubject.onNext("behaviorSubject 大王来巡山1");
publishSubject.onNext("behaviorSubject 大王来巡山2");
publishSubject.onNext("behaviorSubject 大王来巡山3");
publishSubject.onCompleted();


rxjava完整实例代码与运行结果:

package com.wiparking.rxjava_learn;

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.subjects.AsyncSubject;
import rx.subjects.BehaviorSubject;
import rx.subjects.PublishSubject;
import rx.subjects.ReplaySubject;

public class MainActivity extends AppCompatActivity {

private String TAG = "callback";

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
//创建观察者1
Subscriber subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted");
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}

@Override
public void onNext(String s) {
Log.d(TAG, "onNext" + s);
}

};
//创建观察者2
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted");
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}

@Override
public void onNext(String s) {
Log.d(TAG, "onNext" + s);
}
};
//创建被观察者1
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("变形金刚");
subscriber.onNext("X战警");
subscriber.onCompleted();

}
});

String[] s = {"sss", "sss"};
//创建被观察者2
Observable observable1 = Observable.from(s);
//创建被观察者3
Observable observable2 = Observable.just("sss", "sss");
//订阅
Log.d(TAG, "\n--------------------------\n" +
"订阅1\n");
observable.subscribe(subscriber);
Log.d(TAG, "\n--------------------------\n" +
"订阅2\n");
observable.subscribe(observer);
Log.d(TAG, "\n--------------------------\n" +
"订阅3\n");
observable1.subscribe(observer);
Log.d(TAG, "\n--------------------------\n" +
"订阅4\n");
observable2.subscribe(observer);
Log.d(TAG, "\n--------------------------\n" +
"asyncSubject示例");
AsyncSubject asyncSubject=AsyncSubject.create();
asyncSubject.onNext("asyncSubject 大王来巡山1");
asyncSubject.onNext("asyncSubject 大王来巡山2");
asyncSubject.onNext("asyncSubject 大王来巡山3");
asyncSubject.onCompleted();
asyncSubject.subscribe(observer);

Log.d(TAG, "\n--------------------------\n" +
"replaySubject示例\n");
ReplaySubject replaySubject=ReplaySubject.create();
replaySubject.onNext("replaySubject 大王来巡山1");
replaySubject.onNext("replaySubject 大王来巡山2");
replaySubject.onNext("replaySubject 大王来巡山3");
replaySubject.onCompleted();
replaySubject.subscribe(observer);
Log.d(TAG, "\n--------------------------\n" +
"behaviorSubject示例\n");
BehaviorSubject behaviorSubject=BehaviorSubject.create();
behaviorSubject.onNext("behaviorSubject 大王来巡山1");
behaviorSubject.onNext("behaviorSubject 大王来巡山2");
behaviorSubject.onNext("behaviorSubject 大王来巡山3");
// behaviorSubject.onCompleted();
behaviorSubject.subscribe(observer);
Log.d(TAG, "\n--------------------------\n" +
"publishSubject示例\n");
PublishSubject publishSubject=PublishSubject.create();
publishSubject.subscribe(observer);
publishSubject.onNext("behaviorSubject 大王来巡山1");
publishSubject.onNext("behaviorSubject 大王来巡山2");
publishSubject.onNext("behaviorSubject 大王来巡山3");
publishSubject.onCompleted();

//内部类写法
Log.d(TAG, "\n--------------------------\n" +
"内部类写法示例\n");
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("变形金刚");
subscriber.onNext("X战警");
subscriber.onCompleted();

}
}).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted");
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}

@Override
public void onNext(String s) {
Log.d(TAG, "onNext" + s);
}
});


//不完整回调
Action1<String> nextAction = new Action1<String>() {
@Override
public void call(String s) {
Log.d(TAG, "onNext" + s);
}
};
Action1<Throwable> errorAction = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.d(TAG, "onError");
}
};
Action0 onCompleteAction = new Action0() {
@Override
public void call() {
Log.d(TAG, "onComplete");
}
};
Log.d(TAG, "\n--------------------------\n" +
"不完整回调示例\n");
observable.subscribe(nextAction, errorAction, onCompleteAction);
}
}
运行结果:
12-25 14:41:23.494 19843-19843/com.wiparking.rxjava_learn D/callback: --------------------------
订阅1
12-25 14:41:23.494 19843-19843/com.wiparking.rxjava_learn D/callback: onNext变形金刚
12-25 14:41:23.494 19843-19843/com.wiparking.rxjava_learn D/callback: onNextX战警
12-25 14:41:23.494 19843-19843/com.wiparking.rxjava_learn D/callback: onCompleted
12-25 14:41:23.494 19843-19843/com.wiparking.rxjava_learn D/callback: --------------------------
订阅2
12-25 14:41:23.494 19843-19843/com.wiparking.rxjava_learn D/callback: onNext变形金刚
12-25 14:41:23.494 19843-19843/com.wiparking.rxjava_learn D/callback: onNextX战警
12-25 14:41:23.494 19843-19843/com.wiparking.rxjava_learn D/callback: onCompleted
12-25 14:41:23.494 19843-19843/com.wiparking.rxjava_learn D/callback: --------------------------
订阅3
12-25 14:41:23.495 19843-19843/com.wiparking.rxjava_learn D/callback: onNextsss
12-25 14:41:23.495 19843-19843/com.wiparking.rxjava_learn D/callback: onNextsss
12-25 14:41:23.495 19843-19843/com.wiparking.rxjava_learn D/callback: onCompleted
12-25 14:41:23.495 19843-19843/com.wiparking.rxjava_learn D/callback: --------------------------
订阅4
12-25 14:41:23.495 19843-19843/com.wiparking.rxjava_learn D/callback: onNextsss
12-25 14:41:23.495 19843-19843/com.wiparking.rxjava_learn D/callback: onNextsss
12-25 14:41:23.495 19843-19843/com.wiparking.rxjava_learn D/callback: onCompleted
12-25 14:41:23.495 19843-19843/com.wiparking.rxjava_learn D/callback: --------------------------
asyncSubject示例
12-25 14:41:23.497 19843-19843/com.wiparking.rxjava_learn D/callback: onNextasyncSubject 大王来巡山3
12-25 14:41:23.497 19843-19843/com.wiparking.rxjava_learn D/callback: onCompleted
12-25 14:41:23.497 19843-19843/com.wiparking.rxjava_learn D/callback: --------------------------
replaySubject示例
12-25 14:41:23.498 19843-19843/com.wiparking.rxjava_learn D/callback: onNextreplaySubject 大王来巡山1
12-25 14:41:23.498 19843-19843/com.wiparking.rxjava_learn D/callback: onNextreplaySubject 大王来巡山2
12-25 14:41:23.498 19843-19843/com.wiparking.rxjava_learn D/callback: onNextreplaySubject 大王来巡山3
12-25 14:41:23.498 19843-19843/com.wiparking.rxjava_learn D/callback: onCompleted
12-25 14:41:23.498 19843-19843/com.wiparking.rxjava_learn D/callback: --------------------------
behaviorSubject示例
12-25 14:41:23.498 19843-19843/com.wiparking.rxjava_learn D/callback: onNextbehaviorSubject 大王来巡山3
12-25 14:41:23.499 19843-19843/com.wiparking.rxjava_learn D/callback: --------------------------
publishSubject示例
12-25 14:41:23.499 19843-19843/com.wiparking.rxjava_learn D/callback: onNextbehaviorSubject 大王来巡山1
12-25 14:41:23.499 19843-19843/com.wiparking.rxjava_learn D/callback: onNextbehaviorSubject 大王来巡山2
12-25 14:41:23.499 19843-19843/com.wiparking.rxjava_learn D/callback: onNextbehaviorSubject 大王来巡山3
12-25 14:41:23.499 19843-19843/com.wiparking.rxjava_learn D/callback: onCompleted
12-25 14:41:23.499 19843-19843/com.wiparking.rxjava_learn D/callback: --------------------------
内部类写法示例
12-25 14:41:23.499 19843-19843/com.wiparking.rxjava_learn D/callback: onNext变形金刚
12-25 14:41:23.499 19843-19843/com.wiparking.rxjava_learn D/callback: onNextX战警
12-25 14:41:23.499 19843-19843/com.wiparking.rxjava_learn D/callback: onCompleted
12-25 14:41:23.500 19843-19843/com.wiparking.rxjava_learn D/callback: --------------------------
不完整回调示例
12-25 14:41:23.500 19843-19843/com.wiparking.rxjava_learn D/callback: onNext变形金刚
12-25 14:41:23.500 19843-19843/com.wiparking.rxjava_learn D/callback: onNextX战警
12-25 14:41:23.500 19843-19843/com.wiparking.rxjava_learn D/callback: onComplete