RxJava 概述

A library for composing asynchronous and event-based programs using observable sequences for the Java VM”

一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。

概念

RxJava中观察者模式概念

Observable-可观察者,即被观察者

Observer-观察者

subscribe-订阅事件。Observable 和 Observer 通过 subscribe() 。

观察者/订阅者的回调事件

onNext(T t)正常的回调事件

onError(Throwable e) 异常发生时的回调事件

onCompleted() 完成时的回调事件

Observable 几种常见的构造方法

create(new Observable.OnSubscribe())

基础的构造方法

just(T…)

等同于onNext(t1,t2..)的调用

from(T[])

等同于

for(int i=0;i<i.length;i++){
onNext(t[i]);
}

的调用

示例

@SmallTest
public void testRxJava() {
//创建观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "completed");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
e.printStackTrace();
}
@Override
public void onNext(String s) {
Log.d(TAG, "get string ==" + s);
}
};
//创建订阅者 订阅者是观察者的一个实现类 封装了onStart() unsubscribe()等方法
Subscriber<String> stringSubscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "completed");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
e.printStackTrace();
}
@Override
public void onNext(String s) {
Log.d(TAG, "get string ==" + s);
}
};
//构造被观察者
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("simple");
subscriber.onNext("test");
subscriber.onNext("try");
}
});
//订阅观察者
Log.d(TAG, "observer subscribing");
observable.subscribe(observer);
observable.subscribe(observer);
}

Action

Action1、Action2、……、ActionN 是RxJava的一系列接口,只包含一个call()方法,Subscriber可以接受这一些列参数构造,产生不完整的回调。
示例

@MediumTest
public void testAction(){
Action0 action=new Action0() {
@Override
public void call() {
}
};
Action1<String> action1=new Action1<String>() {
@Override
public void call(String s) {
Log.d(TAG,"action1 get == "+ s);
}
};
Observable<String> observable=Observable.just("foo","bar","fooo","baar");
observable.subscribe(action1);
}

此外,还有三种方法重载

public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError)
public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onCompleted)

分别为支持onError、onComplted()的构造方式,可以看出,第二个方法和new Subscriber()已经并无区别了

Func

如同Action,Func1,Func2,…,FuncN为Rx提供的变换函数,所谓变换就是处理加工数据的过程,如把传入的URI转换成response或是将传入的资源转换为Bitmap

public static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6)

为Observable设置Fuc的方法,一个具体的示例如下

@MediumTest
public void testFunc() {
Observable.just(45, 48, 78, 147, 88, 488)
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return String.valueOf(integer);
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d(TAG, s);
}
});
}

此外,还有public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func)这个方法来设置将T 转换为R[]的一转多方法。

Scheduler

概述

Scheduler为RxJava的线程调度起,指定任务执行的线程

几个内置的Scheduler

  • Schedulers.immediate():不指定运行线程直接在当前线程运行,默认设置。
  • Schedulers.newThread(): 在新线程执行操作。
  • Schedulers .io(): I/O 线程,I/O操作所使用的 Scheduler。 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。
  • Schedulers.computation(): 计算所使用的Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,使用大小为CUP核心数的的线程池。
  • AndroidSchedulers.mainThread():在Android 主线程即UI线程运行。

几个重要函数

public final Observable<T> observeOn(Scheduler scheduler)
public final Observable<T> subscribeOn(Scheduler scheduler)

分别为指定被观察者call()方法发生线程和订阅者事件回调线程。