can see http://www.jianshu.com/p/19cac3c5b106:
how to use RX java in Android
how to use RX java in Android
compile 'io.reactivex:rxandroid:1.2.1'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
compile 'io.reactivex:rxjava:1.1.6'
|
Most simpl e use with Rxjava
1) 创建 Observer
Observer 即观察者,它决定事件触发的时候将有怎样的行为。 RxJava 中的 Observer 接口的实现方式:
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
|
除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。 Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
|
2) 创建 Observable
Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件。 RxJava 使用 create() 方法来创建一个 Observable ,并为它定义事件触发规则
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
|
Just //from
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
|
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
|
3) Subscribe (订阅)
创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来,整条链子就可以工作了。代码形式很简单:
Observable.subscribe(Subscriber)
|
不完整定义的回调
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
|
Scheduler
在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。
1) Scheduler 的 API (一)
在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:
Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。 * subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 * observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
|
Unbscrib
if don’t use,remeber to un unscrib
Subscription sub = getGistObservable().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Gist>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Log.d("Rxjava", e + "");
}
@Override
public void onNext(Gist gist) {
// Output
for (Map.Entry<String, GistFile> entry : gist.files.entrySet()) {
Log.d(TAG, entry.getKey());
Log.d(TAG, "Length of file: " + entry.getValue().content.length());
}
}
});
|
sub.unsubscribe();
|
unsubscribe should call at the activity destory
Lesson 1 example
package io.caster.rxexamples;
import android.content.Context;
import android.content.DialogInterface;
import android.os.AsyncTask;
import android.os.Bundle;
import android.support.annotation.Nullable;
import android.support.v7.app.AlertDialog;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import com.google.gson.Gson;
import com.squareup.okhttp.OkHttpClient;
import com.squareup.okhttp.Request;
import com.squareup.okhttp.Response;
import java.io.IOException;
import java.util.Map;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Func0;
import rx.schedulers.Schedulers;
public class MainActivity extends AppCompatActivity {
private static final String TAG = MainActivity.class.getSimpleName();
public static void showCustomDialog(String title, String message, Context activity) {
AlertDialog.Builder alertDialog = new AlertDialog.Builder(activity);
alertDialog.setTitle(title);
alertDialog.setMessage(message);
alertDialog.setNegativeButton(activity.getString(android.R.string.ok), new DialogInterface.OnClickListener() {
public void onClick(DialogInterface dialog, int which) {
dialog.cancel();
}
});
alertDialog.show();
}
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
getGistObservable().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Gist>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Gist gist) {
// Output
for (Map.Entry<String, GistFile> entry : gist.files.entrySet()) {
Log.d(TAG, entry.getKey());
Log.d(TAG, "Length of file: " + entry.getValue().content.length());
}
}
});
}
@Nullable
private Gist getGist() throws IOException {
OkHttpClient client = new OkHttpClient();
// Go get this Gist: https://gist.github.com/donnfelker/db72a05cc03ef523ee74
// via the GitHub API
Request request = new Request.Builder()
.url("https://api.github.com/gists/db72a05cc03ef523ee74")
.build();
Response response = client.newCall(request).execute();
if (response.isSuccessful()) {
Gist gist = new Gson().fromJson(response.body().charStream(), Gist.class);
return gist;
}
return null;
}
public Observable<Gist> getGistObservable(){
return Observable.defer(new Func0<Observable<Gist>>() {
@Override
public Observable<Gist> call() {
try {
return Observable.just(getGist());
} catch (IOException e) {
return null;
}
}
});
}
}
|
in the above code,inorder to do get the gist when create the Observable,HOwerver, if we use
Observable.just(getGist()) only ,not use defer,the getGist will get immeduately,we don’t want that,
we wnat it run when have some on subscrib,so we use defer.
RxJava Error Handling
when have return Observable.error(e);
the error will catch by the onError of the Observer
package io.caster.rxexamples;
import android.content.Context;
import android.content.DialogInterface;
import android.os.AsyncTask;
import android.os.Bundle;
import android.support.annotation.Nullable;
import android.support.v7.app.AlertDialog;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import com.google.gson.Gson;
import com.squareup.okhttp.OkHttpClient;
import com.squareup.okhttp.Request;
import com.squareup.okhttp.Response;
import java.io.IOException;
import java.util.Map;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Func0;
import rx.schedulers.Schedulers;
public class MainActivity extends AppCompatActivity {
private static final String TAG = MainActivity.class.getSimpleName();
public static void showCustomDialog(String title, String message, Context activity) {
AlertDialog.Builder alertDialog = new AlertDialog.Builder(activity);
alertDialog.setTitle(title);
alertDialog.setMessage(message);
alertDialog.setNegativeButton(activity.getString(android.R.string.ok), new DialogInterface.OnClickListener() {
public void onClick(DialogInterface dialog, int which) {
dialog.cancel();
}
});
alertDialog.show();
}
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
getGistObservable().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Gist>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Log.d("Rxjava",e+"");
}
@Override
public void onNext(Gist gist) {
// Output
for (Map.Entry<String, GistFile> entry : gist.files.entrySet()) {
Log.d(TAG, entry.getKey());
Log.d(TAG, "Length of file: " + entry.getValue().content.length());
}
}
});
}
@Nullable
private Gist getGist() throws IOException {
OkHttpClient client = new OkHttpClient();
// Go get this Gist: https://gist.github.com/donnfelker/db72a05cc03ef523ee74
// via the GitHub API
Request request = new Request.Builder()
.url("https://api.github.com/gists/db72a05cc03ef523ee74")
.build();
Response response = client.newCall(request).execute();
if (response.isSuccessful()) {
Gist gist = new Gson().fromJson(response.body().charStream(), Gist.class);
return gist;
}
return null;
}
public Observable<Gist> getGistObservable(){
return Observable.defer(new Func0<Observable<Gist>>() {
@Override
public Observable<Gist> call() {
try {
return Observable.just(getGist());
} catch (IOException e) {
return Observable.error(e);
}
}
});
}
}
|
Understanding the RxJava Observable.zip Operator
have two Observable
How to use with Retrofit
dependency
compile 'io.reactivex:rxandroid:1.2.1'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
compile 'io.reactivex:rxjava:1.1.6'
compile 'com.squareup.retrofit2:adapter-rxjava:2.0.0-beta4'
|
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(baseUrl)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
|
from
public interface MovieService {
@GET("top250")
Call<MovieEntity> getTopMovie(@Query("start") int start, @Query("count") int count);
}
|
to
public interface MovieService {
@GET("top250")
Observable<MovieEntity> getTopMovie(@Query("start") int start, @Query("count") int count);
}
|
use
MovieService movieService = retrofit.create(MovieService.class);
movieService.getTopMovie(0, 10)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<MovieEntity>() {
@Override
public void onCompleted() {
Toast.makeText(MainActivity.this, "Get Top Movie Completed", Toast.LENGTH_SHORT).show();
}
@Override
public void onError(Throwable e) {
resultTV.setText(e.getMessage());
}
@Override
public void onNext(MovieEntity movieEntity) {
resultTV.setText(movieEntity.toString());
}
});
|