RxJava初体验

436 查看

序言

公司中项目用到了RXJava,搜集了一些资料来学习,同时项目中使用了okhttp,所以借此学习机会,自己用RxJava和RxAndroid,然后使用OkHttp实现了一个简单的demo来进行学习,同时通过RxJava实现了一个事件总线,RxBus。本文将结合自己demo,对RxJava进行一个简单的介绍,通过这篇文章可以帮助你便可以在项目中使用RxJava进行一些简单的操作。

RxJava

开始看的第一天,感觉不到这个东西的伟大之处,现在是跪着在看了,说起好处(胸大?还是是臀圆?),先想想我们的在Android中的项目,当我们进行一个网络请求的时候,我们怎么做呢?开一个线程,然后通过Handler传递,或者是通过android提供的异步工具类,但是这么写起来是不是烦的不行,线程的切换烦的不行,当我们的逻辑发生了变化,去修改也是相当的麻烦,而且通过Handler的传递本身也产生了一定的依赖,RxJava 采用观察者模式,事件触发通知观察者,响应式,前面的View响应我们产生的事件。如果你理解了观察者模式,理解起来RxJava将变得非常简单,RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

对于RxJava更详细的讲解请移步大头鬼博客,当然我的代码是建立在你对RxJava已经有一个初步认识的基础上。

 public void testRxJava1(){
        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>(){
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Hello world");
                subscriber.onCompleted();
            }
        });

        Subscriber<String> mySubscriber = new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
            //rvtv为一个TextView
                rvtv.setText(s);
            }
        };
        observable.subscribe(mySubscriber);

    }

这里我们首先声明了一个被观察者,被观察中,我们可以制定一些操作,这里制定了一个输出Hello world,非常简单,在call方法中对于调用观察者的方法,这个观察者的方法便会得到执行,最后需要被观察者订阅一下我们的观察者,当该方法被调用的时候,我们的TextView上便会显示出来文字了。为了简洁,我们也可以不用写这个被观察者,通过制定action即可。

observable.subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                rvtv.setText(s);
            }
        });

既然观察着有简写,那么被观察者呢?

public void testRxJava2(){
        Observable.just("HI","hi").subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                rvtv.setText(s);
            }
        });

    }

当我们只是进行一些简单的传递的时候,我们可以通过just来实现。
RxJava中的操作符,我想这个写起来,可能足够写一篇博客的。最常用的,也是最简单的一个map,RxJava中的操作符,返回的对象是Observable,通过这,我们可以实现一个数据流式的调用,让数据流进行传递,然后将其中的数据进行相应的处理之后,向下进行传递。

 public void testRxJava3(){
        Observable.just("Hi,I'm Jensen").map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                return s + "?";
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                rvtv2.setText(s);
            }
        });
    }

下面的例子是借助于OkHttp来获取百度的首页数据然后进行展示,通过OkHttp来进行数据的获取非常简单实例代码.

public String sendGetRequest(String url) throws IOException{
        OkHttpClient client = new OkHttpClient();
        Request request = new Request.Builder().url(url).build();
        Response response = client.newCall(request).execute();
        if(response.isSuccessful()){
            return response.body().string();
        }else{
            throw new IOException("Unexpected code");
        }
    }

这些逻辑如果我们不借助RxJava,我们需要开启一个新的线程,然后通过Handler将数据传递到主线程中,然后我们的这种方式可以进行,我们借助RxJava,则可以对被观察者和被观察者执行的操作制定相应的线程执行。这点也是RxJava的惊人之处和方便之处,线程的切换非常的方便。

 public void testRxJavaWithOkHttp(){
         Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                String result=null;
                try{
                     result = sendGetRequest("http://www.baidu.com");
                }catch (IOException e){

                }
                if(result!=null)
                subscriber.onNext(result);
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        rvtv2.setText(s);
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        rvtv2.setText("出现了错误");
                    }
                });
    }

在这里我们对其指定了相应的执行线程。

RxBus

rxbus,这个是一个事件总线,当我们有事件触发了,我们就可以将我们的事件丢到总线上,然后订阅了该事件的观察者便会被通知,然后根据我们的逻辑执行。前面我们写的例子,再执行一次之后便会将事件传递给观察者之后就结束了,但是我们可能有些轮询操作,当轮询到结果的时候,便把轮询的结果丢给RxBus,即可继续自己的操作,事件就会通过RxBus传递给订阅者。

public class RxBus {
    private static volatile  RxBus mInstance;
    private final Subject mBus;


    private RxBus(){
        mBus = new SerializedSubject<>(PublishSubject.create());
    }

    public static RxBus getmInstance(){
        if(mInstance==null){
            synchronized (RxBus.class){
                if(mInstance==null)
                    mInstance = new RxBus();
            }
        }
        return mInstance;
    }

    public void post(Object o){
        mBus.onNext(o);
    }

    public <T>Observable<T> toObservable(Class<T> eventType){
        return mBus.ofType(eventType);
    }
}

这里的Subject可以看做是观察者和被观察者的一个中介,通过bus的toObservable方法根据传递进的我们所要监听事件,然后我们可以得到一个被观察者,通过被观察者进行订阅,当我们post的时候,Subject的内部是调用了其onNext方法传递了我们的响应事件对象,Subject既可以作为一个观测者,同时也可以作为一个被观测者,这里根据传递的事件类型,返回一个针对该事件类型被观测者,然后当post的时候调用了该种贯彻者的onNext方法,从而我们的订阅者便可以从中得到反馈信息。下面一个简单的使用。

 public void publishSubjectTest(){
        RxBus bus = RxBus.getmInstance();
        NetEvent event = new NetEvent();
        event.setContent("Hi,I'm RxBus");
        Subscription sb = bus.toObservable(NetEvent.class)
                .subscribe(new Action1<NetEvent>() {
                    @Override
                    public void call(NetEvent netEvent) {
                        rvtv2.setText(netEvent.getContent());
                    }
                });

       bus.post(event);
    }

很粗略的说了一下RxJava的用法,当然对其精髓理解还是不够深刻的,后续将会根据自己的使用,对其进行更进一步的剖析。