RxAndroid 学习3部曲之终极篇- RXjava,RxAndroid使用

  |   0 评论   |   1,263 浏览

Rx是响应式编程的意思, 本质是观察者模式, 是以观察者(Observer)和订阅者(Subscriber)为基础的异步响应方式. 在Android编程时, 经常会使用后台线程, 那么就可以使用这种方式. 目前的异步编程方式都会导致一些问题.

RxAndroid来源于RxJava, 在RxJava的基础上扩展了一些Android的功能, 已经发布1.2版本, 让我们来看看怎么用吧.

环境准备:配置build.gradle, 添加RxAndroid库和Lamada表达式支持

apply plugin: 'com.android.application'
apply plugin: 'me.tatarka.retrolambda'
android {
    ...
    compileOptions {
        sourceCompatibility JavaVersion.VERSION_1_8
        targetCompatibility JavaVersion.VERSION_1_8
    }
    ...
}

dependencies {
   ...
    //rx
    compile 'io.reactivex:rxjava:1.1.0'
    compile 'io.reactivex:rxandroid:1.1.0'
    //other 
}
//Java控制台乱码
tasks.withType(JavaCompile) {
    options.encoding = "UTF-8"
}

项目根节点下的build.gradle导入:

classpath 'me.tatarka:gradle-retrolambda:2.5.0'

定义一个RxAndroid 的工具类RxUtils:

package cn.istarvip.rxandroid3;

import android.util.Log;

import java.util.concurrent.TimeUnit;

import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

/**
 * User:郑禄秀
 * Site:istarvip.cn
 * Date: 2016-06-17
 * Time: 11:57
 * FIXME 开始使用RxAndroid 的具体实例
 */
public class RxUtils {
    public static final String TAG = RxUtils.class.getSimpleName();
    private static void getUser(){

    }
    //创建被观察者
    public static void createObservalbe() {
        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                //判断是否订阅
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onNext("hello");
                    subscriber.onNext("Jack");
                    subscriber.onNext(download());
                    subscriber.onNext("world");
                    //完成
                    subscriber.onCompleted();
                }
            }
        });
        //创建订阅事件
        Subscriber<String> showSub = new Subscriber<String>() {
            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted");
            }
            @Override
            public void onError(Throwable e) {
                Log.e(TAG, e.getMessage());
            }
            @Override
            public void onNext(String s) {
                Log.e(TAG, "onNEXT:---"+s);
            }
        };
        observable.subscribe(showSub);//关联被观察者
    }
    private static String download() {
        return "正在下载》》》";
    }
    //连着创建
    public static  void create2(){
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                if (!subscriber.isUnsubscribed()){
                    for (int i=0; i<10;i++){
                        subscriber.onNext(i);
                    }
                    subscriber.onCompleted();
                }
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, e.getMessage());
            }
            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNEXT:---"+integer);
            }
        });
    }
    //使用在被观察者,返回的对象一般都是数值类型
    public static void form(){
        Integer[] items={1,3,4,2,4,5,3,4,8};
        Observable observable=Observable.from(items);
        observable.subscribe(new Action1() {
            @Override
            public void call(Object o) {
                Log.e(TAG,o.toString());
            }
        });
    }
        //指定某一时刻进行数据发送
    public static void  interval(){
        //延迟时间 时间间隔 时间单位
        Observable observable=Observable.interval(1,1, TimeUnit.SECONDS);//每一秒发送数据
        observable.subscribe(new Action1() {
            @Override
            public void call(Object o) {
                Log.e(TAG,o.toString());
            }
        });
    }

    //just方式:just方法可以接受1-10个参数
    public static void just(){
        Integer[] items1 = {1, 2, 3, 4, 5, 6};
        Integer[] items2 = {3, 5, 6, 8, 3, 8};
        Observable observable=Observable.just(items1,items2);
        observable.subscribe(new Subscriber<Integer[]>() {

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, e.getMessage());
            }

            @Override
            public void onNext(Integer[] integers) {
                for(int i=0;i<integers.length;i++){
                    Log.i(TAG,"next:"+integers[i]);
                }
            }
        });
    }

    //使用范围数据,指定输出数据的范围
    public static void range(){
        //开始数值,数值count
        Observable observable=Observable.range(24,43);
        observable.subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted");
            }
            @Override
            public void onError(Throwable e) {
                Log.e(TAG, e.getMessage());
            }

            @Override
            public void onNext(Integer integer) {
                Log.i(TAG,"next:"+ integer);
            }
        });
    }
    //使用过滤功能
    public static void filter(){
        //Integer[] items={1,3,5,4,43,52, 34,33};
        Observable observable=Observable.just(1,2,32,43,324,2,24);
        observable.filter(new Func1<Integer,Boolean>() {
            @Override
            public Boolean call(Integer integers) {
                return integers<53;
            }

        }).observeOn(Schedulers.io()).subscribe(new Subscriber() {
            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, e.getMessage());
            }

            @Override
            public void onNext(Object o) {
                Log.i(TAG,"next:"+ o.toString());
            }
        });
    }
}

接下来就是在Activity中调用了

click.png

响应式方式执行, 使用IO线程处理, 主线程响应, 也可以使用其他线程处理, 如Schedulers.io()处理IO的线程,Schedulers.computation()计算的线程, Schedulers.newThread()新创建的线程.

使用响应式编程可以更好的处理内存泄露问题, 代码也更加优雅和可读, 选择执行线程和监听线程也更加方便. 在destroy时, 可以关闭正在执行的异步任务. 还有一些其他优势, 就参考网站吧.

至此RxAndroid的学习讲解就结束了,更多请查看官方文档。

RxJava与RxAndroid学习资源

木水川 

                                                by 猿码阁

>