操作符之创建操作符

时间:2022-09-16 15:14:01

     1、作用

  • 创建 被观察者( Observable) 对象 & 发送事件。

     2、类型

操作符之创建操作符

 

     3、详解

          3.1  create()

    public static void create() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "onNext: value = " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

         输出:

08-06 10:27:59.073  8859  8859 D Operation: onSubscribe
08-06 10:27:59.073  8859  8859 D Operation: onNext: value = 1
08-06 10:27:59.073  8859  8859 D Operation: onNext: value = 2
08-06 10:27:59.073  8859  8859 D Operation: onNext: value = 3
08-06 10:27:59.073  8859  8859 D Operation: onComplete

 

          3.2   just()----- 见rxdocs.pdf第49页

     作用:按顺序原样发射数据

                   注:最多发射10个参数

操作符之创建操作符

 

    public static void just() {
        Observable.just(1, 6, 8)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

          输出:

08-06 10:29:12.812  9095  9095 D Operation: onSubscribe
08-06 10:29:12.812  9095  9095 D Operation: onNext: value = 1
08-06 10:29:12.812  9095  9095 D Operation: onNext: value = 6
08-06 10:29:12.812  9095  9095 D Operation: onNext: value = 8
08-06 10:29:12.812  9095  9095 D Operation: onComplete

 

          3.3  fromArray()----- 见rxdocs.pdf第42页

       作用:直接发送 传入的数组数据

操作符之创建操作符

 

    public static void fromArray() {
        String[] array = new String[]{"I", "am", "RxJava"};
        Observable.fromArray(array)
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(String value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

          输出:

08-06 11:09:21.574 11135 11135 D Operation: onSubscribe
08-06 11:09:21.574 11135 11135 D Operation: onNext: value = I
08-06 11:09:21.574 11135 11135 D Operation: onNext: value = am
08-06 11:09:21.574 11135 11135 D Operation: onNext: value = RxJava
08-06 11:09:21.574 11135 11135 D Operation: onComplete

 

          3.4   fromIterable()  ----- 见rxdocs.pdf第42页

       作用:直接发送 传入的集合List数据

    public static void fromIterable() {
        List<String> array = new ArrayList<>();
        array.add("I");
        array.add("am");
        array.add("RxJava");
        Observable.fromIterable(array)
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(String value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

          输出:

08-06 11:18:52.909 11428 11428 D Operation: onSubscribe
08-06 11:18:52.909 11428 11428 D Operation: onNext: value = I
08-06 11:18:52.909 11428 11428 D Operation: onNext: value = am
08-06 11:18:52.910 11428 11428 D Operation: onNext: value = RxJava
08-06 11:18:52.910 11428 11428 D Operation: onComplete

 

          3.5   empty,error,never  ----- 见rxdocs.pdf第41页

// 下列方法一般用于测试使用

<-- empty()  -->
// 该方法创建的被观察者对象发送事件的特点:仅发送Complete事件,直接通知完成
Observable observable1=Observable.empty(); 
// 即观察者接收后会直接调用onCompleted()

<-- error()  -->
// 该方法创建的被观察者对象发送事件的特点:仅发送Error事件,直接通知异常
// 可自定义异常
Observable observable2=Observable.error(new RuntimeException())
// 即观察者接收后会直接调用onError()

<-- never()  -->
// 该方法创建的被观察者对象发送事件的特点:不发送任何事件
Observable observable3=Observable.never();
// 即观察者接收后什么都不调用

 

          3.6   defer()  ----- 见rxdocs.pdf第38页

     作用:直到有观察者(Observer )订阅时,才动态创建被观察者对象(Observable) & 发送事件

 操作符之创建操作符

 

    public static void defer() {
        Observable.defer(new Callable<ObservableSource<?>>() {
            @Override
            public ObservableSource<?> call() throws Exception {
                return Observable.just(3, 6, 9);
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Object value) {
                Log.d(TAG, "onNext: value = " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

          输出:

08-06 11:38:32.468 12057 12057 D Operation: onSubscribe
08-06 11:38:32.468 12057 12057 D Operation: onNext: value = 3
08-06 11:38:32.468 12057 12057 D Operation: onNext: value = 6
08-06 11:38:32.468 12057 12057 D Operation: onNext: value = 9
08-06 11:38:32.468 12057 12057 D Operation: onComplete

 

          3.7   timer()  ----- 见rxdocs.pdf第61页

      作用:延迟指定时间后,发送1个数值0(Long类型),   运行在computation Scheduler.

 操作符之创建操作符

    public static void timer() {
        Observable.timer(2, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

          输出:

08-06 11:44:25.364 12314 12314 D Operation: onSubscribe
08-06 11:44:27.366 12314 12340 D Operation: onNext: value = 0
08-06 11:44:27.368 12314 12340 D Operation: onComplete

 

         3.8   interval()  ----- 见rxdocs.pdf第47页

      作用:每隔指定时间 就发送 事件

操作符之创建操作符

    public static void interval() {
        Observable.interval(0, 2, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

          输出:

08-06 11:51:14.280 12641 12641 D Operation: onSubscribe
08-06 11:51:14.281 12641 12665 D Operation: onNext: value = 0
08-06 11:51:16.282 12641 12665 D Operation: onNext: value = 1
08-06 11:51:18.282 12641 12665 D Operation: onNext: value = 2
08-06 11:51:20.286 12641 12665 D Operation: onNext: value = 3

 

        3.9   intervalRange()

    作用:每隔指定时间 就发送 事件,可指定发送的数据的数量。 作用类似于interval(),但可指定发送的数据的数量。

    public static void intervalRange() {
        Observable.intervalRange(6, 5, 0, 2, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

          输出:

08-06 14:14:14.095 15543 15543 D Operation: onSubscribe
08-06 14:14:14.097 15543 15567 D Operation: onNext: value = 6
08-06 14:14:16.098 15543 15567 D Operation: onNext: value = 7
08-06 14:14:18.097 15543 15567 D Operation: onNext: value = 8
08-06 14:14:20.098 15543 15567 D Operation: onNext: value = 9
08-06 14:14:22.097 15543 15567 D Operation: onNext: value = 10
08-06 14:14:22.098 15543 15567 D Operation: onComplete

 

        3.10   range()  ----- 见rxdocs.pdf第51页

       作用:连续发送 1个事件序列,可指定范围。作用类似于intervalRange(),但区别在于:无延迟发送事件

操作符之创建操作符

 

    public static void range() {
        Observable.range(6, 5)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

          输出:

08-06 14:27:52.744 17569 17569 D Operation: onSubscribe
08-06 14:27:52.745 17569 17569 D Operation: onNext: value = 6
08-06 14:27:52.745 17569 17569 D Operation: onNext: value = 7
08-06 14:27:52.745 17569 17569 D Operation: onNext: value = 8
08-06 14:27:52.745 17569 17569 D Operation: onNext: value = 9
08-06 14:27:52.745 17569 17569 D Operation: onNext: value = 10
08-06 14:27:52.745 17569 17569 D Operation: onComplete

 

        3.11   rangeLong()

       作用:类似于range(),区别在于该方法支持数据类型Long