RxJava操作符在android中的使用场景详解(二)

365 查看

转载请注明出处:http://www.wangxinarhat.com/2016/05/01/2016-05-01-rxjava-android-operate2/

最近比较忙,也没想好这个文章该怎么写下去。可能会比较水,不过做事不能虎头蛇尾,所以继续吧。

场景五:BehaviorSubject操作符的使用(桥梁)

使用场景:制作缓存

效果图

代码:

  • 缓存管理类

    public class DataCache {        
                /**
                 * 读取磁盘缓存数据
                 */
                public List<ImageInfoBean> readData() {
                    ...
                }
            
                /**
                 * 写缓存
                 */
                public void writeData(List<ImageInfoBean> list) {
                    ...       
                }
            
                /**
                 * 删除缓存
                 */
                public boolean deleteCache() {
                    ...
                }
            }
  • 数据管理类

    public class Data {
        private static Data instance;
        private static final int DATA_SOURCE_MEMORY = 1;//内存
        private static final int DATA_SOURCE_DISK = 2;//硬盘
        private static final int DATA_SOURCE_NETWORK = 3;//网络
        BehaviorSubject<List<ImageInfoBean>> cache;
        private int dataSource;    
        
        private Data() {
        }    
        
        public static Data newInstance() {
            if (instance == null) {
                instance = new Data();
            }
            return instance;
        }    
        private void setDataSource(@DataSource int dataSource) {
            this.dataSource = dataSource;
        }    
        
        public String getDataSourceText() {
            int dataSourceTextRes;
            switch (dataSource) {
                case DATA_SOURCE_MEMORY:
                    dataSourceTextRes = R.string.data_source_memory;
                    break;
                case DATA_SOURCE_DISK:
                    dataSourceTextRes = R.string.data_source_disk;
                    break;
                case DATA_SOURCE_NETWORK:
                    dataSourceTextRes = R.string.data_source_network;
                    break;
                default:
                    dataSourceTextRes = R.string.data_source_network;
            }
            return BaseApplication.getApplication().getString(dataSourceTextRes);
        }    
        /**
         * 请求网络数据
         */
        public void loadData() {    
            Network.getGankApi()
                    .getBeauties(80, 1)
                    .map(BeautyResult2Beautise.newInstance())
                    .doOnNext(new Action1<List<ImageInfoBean>>() {
                        @Override
                        public void call(List<ImageInfoBean> list) {
                            DataCache.newInstance().writeData(list);
                        }
                    })
                    .subscribe(new Action1<List<ImageInfoBean>>() {
                        @Override
                        public void call(List<ImageInfoBean> list) {
                            cache.onNext(list);
                        }
                    }, new Action1<Throwable>() {
                        @Override
                        public void call(Throwable throwable) {
                            throwable.printStackTrace();
                        }
                    });    
        }    
        /**
         * 获取数据
         * @param observer
         * @return
         */
        public Subscription subscribeData(@Nullable Observer<List<ImageInfoBean>> observer) {    
            if (null == cache) {
                cache = BehaviorSubject.create();
                Observable.create(new Observable.OnSubscribe<List<ImageInfoBean>>() {
                    @Override
                    public void call(Subscriber<? super List<ImageInfoBean>> subscriber) {
                        //从缓存获取数据
                        List<ImageInfoBean> list = DataCache.newInstance().readData();    
                        if (null == list) {
                            setDataSource(DATA_SOURCE_NETWORK);
                            //请求网络数据
                            loadData();
                        } else {
                            setDataSource(DATA_SOURCE_DISK);
                            subscriber.onNext(list);
                        }    
                    }
                })
                        .subscribeOn(Schedulers.io()).subscribe(cache);    
            } else {
                //内存中获取的数据
                setDataSource(DATA_SOURCE_MEMORY);
            }    
            return cache.observeOn(AndroidSchedulers.mainThread()).subscribe(observer);    
        }    
        /**
         * 清空内存
         */
        public void clearMemoryCache() {
            cache = null;
        }    
        /**
         * 清空内存和硬盘数据
         */
        public void clearMemoryAndDiskCache() {
            clearMemoryCache();
            DataCache.newInstance().deleteCache();
        }
    }
  • 获取数据

    @OnClick(R.id.load)
    public void onClick() {
        startTime = System.currentTimeMillis();
        swipeRefreshLayout.setRefreshing(true);
        unsubscribe();
        subscription = Data.newInstance()
                .subscribeData(getObserver());
    }
  • 在观察者中进行获取数据结果的处理

    private Observer<List<ImageInfoBean>> getObserver() {
        if (null == observer) {
            observer = new Observer<List<ImageInfoBean>>() {
                @Override
                public void onCompleted() {
                }
                @Override
                public void onError(Throwable e) {
                    Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();
                }
                @Override
                public void onNext(List<ImageInfoBean> list) {
                    swipeRefreshLayout.setRefreshing(false);
                    int loadingTime = (int) (System.currentTimeMillis() - startTime);
                    dataSituation.setText(getString(R.string.loading_time_and_source, loadingTime, Data.newInstance().getDataSourceText()));
                    adapter.setImages(list);
                }
            };
        }
        return observer;
    }

详解

Subject可以看成是一个桥梁或者代理,在RxJava中同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。

场景六:retryWhen操作符的使用(错误处理)

使用场景:有的 token 并非一次性的,而是可以多次使用,直到它超时或被销毁(多数 token 都是这样的)。

这样的 token 处理起来比较麻烦:需要把它保存起来,并且在发现它失效的时候要能够自动重新获取新的 token >并继续访问之前由于 token 失效而失败的请求。

如果项目中有多处的接口请求都需要这样的自动修复机制,使用传统的 Callback 形式需要写出非常复杂的代码。
而使用 RxJava ,可以用 retryWhen() 来轻松地处理这样的问题。

效果图

Token API准备

由于找不到足够简单的用于示例的 token API,以下API是代码伪造的

    /**
     * Created by wangxinarhat on 16-4-5.
     * TokenApi
     */
    public class TokenApi {
    
        /**
         * 获取Observable
         * @param auth
         * @return
         */
        public static Observable<Token> getToken(@NonNull String auth) {
            return Observable.just(auth).map(new Func1<String, Token>() {
                @Override
                public Token call(String s) {
    
                    try {
                        Thread.sleep(new Random().nextInt(600) + 600);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                    Token token = new Token();
                    token.token = createToken();
    
                    return token;
                }
            });
        }
    
        /**
         * 随机生成token
         * @return
         */
        private static String createToken() {
    
            return "token_wangxinarhat_" + System.currentTimeMillis() % 1000;
        }
    
    
        /**
         * 根据Token获取用户数据
         * @param token
         * @return
         */
        public static Observable<DataInfo> getData(@NonNull Token token) {
            return Observable.just(token).map(new Func1<Token, DataInfo>() {
                @Override
                public DataInfo call(Token token) {
    
                    try {
                        Thread.sleep(new Random().nextInt(600) + 600);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                    if (token.isInvalid) {
                        throw new IllegalArgumentException("Token is invalid");
                    }
                    DataInfo dataInfo = new DataInfo();
                    dataInfo.id = (int) (System.currentTimeMillis() % 1000);
                    dataInfo.name = "USER_" + dataInfo.id;
    
                    return dataInfo;
                }
            });
        }
    }  
   

Token

    /**
     *Token类
     */
    public class Token {
        public String token;
        public boolean isInvalid;//token是否失效    

        public Token(boolean isInvalid) {
            this.isInvalid = isInvalid;
        }    

        public Token() {
        }
    }

用户数据

    /**
     * Created by wangxinarhat on 16-4-5.
     * 用户数据
     */
    public class DataInfo {
        public int id;
        public String name;
    }

操作符的使用

  • 根据token请求数据

    @OnClick(R.id.requestBt)
    void request() {
        tokenUpdated = false;
        swipeRefreshLayout.setRefreshing(true);
        unsubscribe();
        final TokenApi tokenApi = new TokenApi();
        subscription = Observable.just(null).flatMap(new Func1<Object, rx.Observable<DataInfo>>() {
            @Override
            public Observable<DataInfo> call(Object o) {
    
                return null == cachedFakeToken.token ?
                        Observable.<DataInfo>error(new NullPointerException("token id null")) :
                        tokenApi.getData(cachedFakeToken);
            }
        }).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
            @Override
            public Observable<?> call(Observable<? extends Throwable> observable) {
                return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        if (throwable instanceof IllegalArgumentException || throwable instanceof NullPointerException) {
                            return tokenApi.getToken("flat_map")
                                    .doOnNext(new Action1<Token>() {
                                        @Override
                                        public void call(Token token) {
                                            tokenUpdated = true;
                                            cachedFakeToken.token = token.token;
                                            cachedFakeToken.isInvalid = token.isInvalid;
                                        }
                                    });
                        }
                        return Observable.just(throwable);
                    }
                });
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<DataInfo>() {
                    @Override
                    public void call(DataInfo dataInfo) {
                        swipeRefreshLayout.setRefreshing(false);
                        String token = cachedFakeToken.token;
                        if (tokenUpdated) {
                            token += "(" + getString(R.string.updated) + ")";
                        }
                        tokenTv.setText(String.format(getString(R.string.got_token_and_data), token, dataInfo.id, dataInfo.name));
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        swipeRefreshLayout.setRefreshing(false);
                        Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();
                    }
                });
    } 
  • 销毁token

    @OnClick(R.id.invalidateTokenBt)
    void incalidate() {
        cachedFakeToken.isInvalid = true;
        Toast.makeText(getActivity(), R.string.token_expired, Toast.LENGTH_SHORT).show();
    }

详解

如果原始Observable遇到错误,重新订阅它期望它能正常终止。

retryWhen操作符不会将原始Observable的onError通知传递给观察者,它会订阅这个Observable,再给它一次机会无错误地完成它的数据序列。

retryWhen总是传递onNext通知给观察者,由于重新订阅,可能会造成数据项重复。

无论收到多少次onError通知,无参数版本的retryWhen都会继续订阅并发射原始Observable。
接受单个count参数的retryWhen会最多重新订阅指定的次数,如果次数超了,它不会尝试再次订阅,它会把最新的一个onError通知传递给它的观察者。

还有一个版本的retryWhen接受一个谓词函数作为参数,这个函数的两个参数是:重试次数和导致发射onError通知的Throwable。这个函数返回一个布尔值,如果返回true,retryWhen应该再次订阅和镜像原始的Observable,如果返回false,retryWhen会将最新的一个onError通知传递给它的观察者。
retryWhen操作符默认在trampoline调度器上执行。

场景七:Debounce操作符的使用(过滤)

使用场景

实时搜索,如果在EditText中监听到字符改变就发起请求数据,明显不合适。
有了Debounce操作符,仅在过了指定的一段时间还没发射数据时才发射一个数据,Debounce操作符会过滤掉发射速率过快的数据项,优化网络请求

效果图

代码

  • 配合jakewharton大神的rxbinding使用,获取可观察对象

    @Override
    public void onActivityCreated(@Nullable Bundle savedInstanceState) {
        super.onActivityCreated(savedInstanceState);
        setLogger();
        //使用rxbing给EditText注册字符改变事件
        subscription = RxTextView.textChangeEvents(input)
                .debounce(500, TimeUnit.MILLISECONDS)//设置发射时间间隔
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(getObserver());
    } 
  • 在观察者中进行结果处理

     /**
     * 获取观察者
     * @return
     */
    private Observer<? super TextViewTextChangeEvent> getObserver() {
    
        return new Observer<TextViewTextChangeEvent>() {
            @Override
            public void onCompleted() {
            }
    
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(TextViewTextChangeEvent textViewTextChangeEvent) {
                //得到搜索关键字,进行网络请求
                log(String.format("搜索关键字 : %s", textViewTextChangeEvent.text().toString()));
            }
        };
    }    
  • 更新adapter数据集

     /**
     * 更新adapter数据集
     *
     * @param logMsg
     */
    private void log(String logMsg) {
    
        if (isCurrentlyOnMainThread()) {
            mLogs.add(0, logMsg + " (main thread) ");
            mAdapter.notifyDataSetChanged();
    
        } else {
            mLogs.add(0, logMsg + " (NOT main thread) ");
            new Handler(Looper.getMainLooper()).post(new Runnable() {
                @Override
                public void run() {
                    mAdapter.notifyDataSetChanged();
                }
            });
    
        }
    }

详解

Debounce仅在过了一段指定的时间还没发射数据时才发射一个数据,会根据设置的时间间隔过滤掉发射速率过快的数据项。

场景八:Buffer操作符的使用(变换)

定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。
这个操作符,我暂时还没有比较好的使用场景,不过既然是可以定期收集数据,那么应该可以做指定时间内点击次数等之类的统计。

效果图

代码

  • 还是使用jakewharton大神的rxbinding,注册点击事件获取可观察对象

    @Override
    public void onActivityCreated(@Nullable Bundle savedInstanceState) {
        super.onActivityCreated(savedInstanceState);
        setLogger();
        subscription = RxView.clicks(btn)
                .map(new Func1<Void, Integer>() {
                    @Override
                    public Integer call(Void aVoid) {
                        log("点击一次");
                        return 1;
                    }
                })
                .buffer(3, TimeUnit.SECONDS)//设置收集数据时间间隔为3s
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(getObserver());
    }    

详解

Buffer操作符将一个Observable变换为另一个,原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合。

还有就是:如果原来的Observable发射了一个onError通知,Buffer会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始Observable发射的数据。

小结

因为我也才尝试使用rx,这篇终于挤出来了,好难。。代码在这里

如果又学到新的使用场景,还是会再写。

说明

我是从国内rx大神扔物线,还有github上star数最多的那位哥们儿(kaushikgopal)学习的。因为他们都没有很详细的说明操作符的使用,所以才想写这个文章。

如想深入学习,请看大神代码。