RxJava 概述
A library for composing asynchronous and event-based programs using observable sequences for the Java VM”
一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。
概念
RxJava中观察者模式概念
Observable-可观察者,即被观察者
Observer-观察者
subscribe-订阅事件。Observable 和 Observer 通过 subscribe() 。
观察者/订阅者的回调事件
onNext(T t)正常的回调事件
onError(Throwable e) 异常发生时的回调事件
onCompleted() 完成时的回调事件
Observable 几种常见的构造方法
create(new Observable.OnSubscribe())
基础的构造方法
just(T…)
等同于onNext(t1,t2..)的调用
from(T[])
等同于
for(int i=0;i<i.length;i++){ onNext(t[i]); }
|
的调用
示例
@SmallTest public void testRxJava() { //创建观察者 Observer<String> observer = new Observer<String>() { @Override public void onCompleted() { Log.d(TAG, "completed"); } @Override public void onError(Throwable e) { Log.d(TAG, "error"); e.printStackTrace(); } @Override public void onNext(String s) { Log.d(TAG, "get string ==" + s); } }; //创建订阅者 订阅者是观察者的一个实现类 封装了onStart() unsubscribe()等方法 Subscriber<String> stringSubscriber = new Subscriber<String>() { @Override public void onCompleted() { Log.d(TAG, "completed"); } @Override public void onError(Throwable e) { Log.d(TAG, "error"); e.printStackTrace(); } @Override public void onNext(String s) { Log.d(TAG, "get string ==" + s); } }; //构造被观察者 Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("simple"); subscriber.onNext("test"); subscriber.onNext("try"); } }); //订阅观察者 Log.d(TAG, "observer subscribing"); observable.subscribe(observer); observable.subscribe(observer); }
|
Action
Action1、Action2、……、ActionN 是RxJava的一系列接口,只包含一个call()方法,Subscriber可以接受这一些列参数构造,产生不完整的回调。
示例
@MediumTest public void testAction(){ Action0 action=new Action0() { @Override public void call() { } }; Action1<String> action1=new Action1<String>() { @Override public void call(String s) { Log.d(TAG,"action1 get == "+ s); } }; Observable<String> observable=Observable.just("foo","bar","fooo","baar"); observable.subscribe(action1); }
|
此外,还有三种方法重载
public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError) public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onCompleted)
|
分别为支持onError、onComplted()的构造方式,可以看出,第二个方法和new Subscriber()已经并无区别了
Func
如同Action,Func1,Func2,…,FuncN为Rx提供的变换函数,所谓变换就是处理加工数据的过程,如把传入的URI转换成response或是将传入的资源转换为Bitmap
public static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6)
|
为Observable设置Fuc的方法,一个具体的示例如下
@MediumTest public void testFunc() { Observable.just(45, 48, 78, 147, 88, 488) .map(new Func1<Integer, String>() { @Override public String call(Integer integer) { return String.valueOf(integer); } }).subscribe(new Action1<String>() { @Override public void call(String s) { Log.d(TAG, s); } }); }
|
此外,还有public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func)
这个方法来设置将T 转换为R[]的一转多方法。
Scheduler
概述
Scheduler为RxJava的线程调度起,指定任务执行的线程
几个内置的Scheduler
- Schedulers.immediate():不指定运行线程直接在当前线程运行,默认设置。
- Schedulers.newThread(): 在新线程执行操作。
- Schedulers .io(): I/O 线程,I/O操作所使用的 Scheduler。 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。
- Schedulers.computation(): 计算所使用的Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,使用大小为CUP核心数的的线程池。
- AndroidSchedulers.mainThread():在Android 主线程即UI线程运行。
几个重要函数
public final Observable<T> observeOn(Scheduler scheduler) public final Observable<T> subscribeOn(Scheduler scheduler)
|
分别为指定被观察者call()方法发生线程和订阅者事件回调线程。