Rxjava+Retrofit取消网络请求

时间:2022-09-30 17:45:56

一、解决办法
刚开始接触Rxjava的朋友可能不知道怎么取消网络请求。
其实直接调用unsubscribe()就可以了。
下面是验证测试

private OkHttpClient okHttpClient;
private Retrofit retrofit;

public interface HttpLogin {
@POST("account/login")
retrofit2.Call<HashMap<String, Object>> login(@Body Account account);
}

@Before
public void setUp() {
OkHttpClient.Builder builder = new OkHttpClient.Builder();
HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor(new MyLogger());
loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
builder.addInterceptor(loggingInterceptor);
okHttpClient = builder.build();

retrofit = new Retrofit.Builder().client(okHttpClient).baseUrl("http://localhost:8080/campus/")
.addCallAdapterFactory(RxJavaCallAdapterFactory.createWithScheduler(Schedulers.newThread()))
.addConverterFactory(JacksonConverterFactory.create())
.build();
}

@Test
public void testRxJava() {
final Subscription subscription = retrofit
.create(HttpLoginRx.class)
.login(new Account("yincs", "123456"))
.subscribe(new Subscriber<HashMap<String, Object>>() {

public void onCompleted() {
System.out.println("onCompleted");
}

public void onError(Throwable arg0) {
System.out.println("onError " + arg0.getMessage());
}

public void onNext(HashMap<String, Object> arg0) {
System.out.println("onNext " + arg0.get("des"));
}
});
subscription.unsubscribe();

try {
//因为网络请求在子线程,所以不能让宿主线程过早的结束。
Thread.sleep(120000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}

返回结果:
message:–> POST http://localhost:8080/campus/account/login http/1.1
message:Content-Type: application/json; charset=UTF-8
message:Content-Length: 48
message:
message:{“account”:”yincs”,”passwd”:”123456”,”userid”:0}
message:–> END POST (48-byte body)
message:<– HTTP FAILED: java.io.IOException: Canceled

看最后一行。这是okhttp抛出的异常。
在okhttp3.RealCall.java中可以看到下面这段代码

  @Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
//如果请求取消了就抛出一个该异常
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}

二、实现原理
在上面创建retrofit的时候,我们传入了一个RxJavaCallAdapterFactory的适配器工厂。
RxJavaCallAdapterFactory.createWithScheduler(Schedulers.newThread())这时传入了一个Scheduler线程控件器。
我们知道Retrofit最后是通过CallAdapter.get方法创建适配器,然后再通过这个适配器.adapt方法创建返回对象(这里也就是创建Observable)
现在看下retrofit2.adapter.rxjava.RxJavaCallAdapterFactory.get()方法

 @Override
public CallAdapter<?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
Class<?> rawType = getRawType(returnType);
String canonicalName = rawType.getCanonicalName();
//isSingle = flase
boolean isSingle = "rx.Single".equals(canonicalName);
//isCompletable = flase
boolean isCompletable = "rx.Completable".equals(canonicalName);
if (rawType != Observable.class && !isSingle && !isCompletable) {
return null;
}
if (!isCompletable && !(returnType instanceof ParameterizedType)) {
String name = isSingle ? "Single" : "Observable";
throw new IllegalStateException(name + " return type must be parameterized"
+ " as " + name + "<Foo> or " + name + "<? extends Foo>");
}

if (isCompletable) {
// Add Completable-converter wrapper from a separate class. This defers classloading such that
// regular Observable operation can be leveraged without relying on this unstable RxJava API.
// Note that this has to be done separately since Completable doesn't have a parametrized
// type.
return CompletableHelper.createCallAdapter(scheduler);
}

CallAdapter<Observable<?>> callAdapter = getCallAdapter(returnType, scheduler);
if (isSingle) {
// Add Single-converter wrapper from a separate class. This defers classloading such that
// regular Observable operation can be leveraged without relying on this unstable RxJava API.
return SingleHelper.makeSingle(callAdapter);
}
return callAdapter;
}

上面可以看出是直接返回的getCallAdapter(returnType, scheduler);

  private CallAdapter<Observable<?>> getCallAdapter(Type returnType, Scheduler scheduler) {
Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);
Class<?> rawObservableType = getRawType(observableType);
//返回参数里的泛型,这里上面的接口返回的泛型类型是一个HashMap
if (rawObservableType == Response.class) {
if (!(observableType instanceof ParameterizedType)) {
throw new IllegalStateException("Response must be parameterized"
+ " as Response<Foo> or Response<? extends Foo>");
}
Type responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
return new ResponseCallAdapter(responseType, scheduler);
}
//返回参数里的泛型,这里上面的接口返回的泛型类型是一个HashMap
if (rawObservableType == Result.class) {
if (!(observableType instanceof ParameterizedType)) {
throw new IllegalStateException("Result must be parameterized"
+ " as Result<Foo> or Result<? extends Foo>");
}
Type responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
return new ResultCallAdapter(responseType, scheduler);
}
//返回了个SimpleCallAdapter
return new SimpleCallAdapter(observableType, scheduler);
}

好,继续看下SimpleCallAdapter类的源码,我们一开始的目标是要找到adapt方法。不多说直接看

  static final class SimpleCallAdapter implements CallAdapter<Observable<?>> {  
private final Type responseType;
private final Scheduler scheduler;

SimpleCallAdapter(Type responseType, Scheduler scheduler) {
this.responseType = responseType;
this.scheduler = scheduler;
}

@Override public Type responseType() {
return responseType;
}

@Override public <R> Observable<R> adapt(Call<R> call) {
Observable<R> observable = Observable.create(new CallOnSubscribe<>(call))
//异常处理的一个订阅操作。即如果产生了网络异常之类的,就回调订阅者的onError操作
.lift(OperatorMapResponseToBodyOrError.<R>instance());
if (scheduler != null) {
return observable.subscribeOn(scheduler);
}
return observable;
}
}

上面可以看到Observable观察了了一个CallOnSubscribe订阅者。这里需要对Rxjava机制有大致了解

static final class CallOnSubscribe<T> implements Observable.OnSubscribe<Response<T>> {
private final Call<T> originalCall;

CallOnSubscribe(Call<T> originalCall) {
this.originalCall = originalCall;
}

@Override public void call(final Subscriber<? super Response<T>> subscriber) {
// Since Call is a one-shot type, clone it for each new subscriber.
Call<T> call = originalCall.clone();

// Wrap the call in a helper which handles both unsubscription and backpressure.
RequestArbiter<T> requestArbiter = new RequestArbiter<>(call, subscriber);
subscriber.add(requestArbiter);
subscriber.setProducer(requestArbiter);
}
}

继续看RequestArbiter

static final class RequestArbiter<T> extends AtomicBoolean implements Subscription, Producer {     
private final Call<T> call;
private final Subscriber<? super Response<T>> subscriber;

RequestArbiter(Call<T> call, Subscriber<? super Response<T>> subscriber) {
this.call = call;
this.subscriber = subscriber;
}

@Override public void request(long n) {
if (n < 0) throw new IllegalArgumentException("n < 0: " + n);
if (n == 0) return; // Nothing to do when requesting 0.
if (!compareAndSet(false, true)) return; // Request was already triggered.

try {
Response<T> response = call.execute();
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(response);
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
if (!subscriber.isUnsubscribed()) {
subscriber.onError(t);
}
return;
}

if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
//取消订阅的时候就调用了retrofit2.OkHttpCall.cancel()方法
//该方法也就执行了okhttp里call.cancel方法取消网络请求了。
@Override public void unsubscribe() {
call.cancel();
}

@Override public boolean isUnsubscribed() {
return call.isCanceled();
}
}

三、总结。
完~