RXJava操作符——转换类
Observable 的缓存区,源 Observable 发送出来的对象会进过缓存区,当 buffer 里缓存了一定数量的对象,则将它们以 List 的形式发送出去,当源 Observable 发送出 onCompleted 或 onError 通知时,无论当前 buffer 中的对象数量是否足够都会将这个 buffer 发送出去并且传递 onCompleted 或 onError 通知。
将源 Observable 对象转换成多个 Observable 对象并将其展开,再将得到的子项发送出去。由于最终将子项合并到到一个 Observable 对象,所以这些子项的顺序可能是相互交错的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| Observable.just(user1, user2, user3).flatMap(new Func1<User, Observable<User.Book>>() { @Override public Observable<User.Book> call(User user) { System.out.println(user.getName()); return Observable.from(user.getBooks()); } }).subscribe(new Observer<User.Book>() { @Override public void onCompleted() { System.out.println("completed"); }
@Override public void onError(Throwable e) { System.out.println(e.getMessage()); }
@Override public void onNext(User.Book book) { System.out.println("bookName:" + book.getBookName() + "->price:" + book.getBokPrice()); } });
|
上例中 User 中包含一个 Book 列表,通过 flatmap 操作符可将 User 展开得到 Book 的信息。输出结果:
1 2 3 4 5 6 7 8 9 10 11 12
| John bookName:book1->price:20 bookName:book3->price:35 bookName:book5->price:45 Tim bookName:book1->price:20 bookName:book2->price:23 bookName:book4->price:43 Amy bookName:book2->price:23 bookName:book4->price:43 completed
|
将源 Observable 对象发送的子项按自定义的规则进行分类输出。
对源 Observable 对象发送的子项进行某个特定操作后继续将其发送出去。
累加器,将源 Observable 发送的第一个对象继续发送,从第二个开始,将前一个发送的结果返回到回调方法中同当前对象一起处理后再发送出去。与之类似的还有一个 reduce 操作符,区别在于 reduce 只输出最终结果,而 scan 会输出过程中得到的值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| Observable.just(1, 2, 3, 4, 5) .scan(new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer sum, Integer item) { return sum + item; } }).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("onNext: " + item); }
@Override public void onError(Throwable error) { System.err.println("onError: " + error.getMessage()); }
@Override public void onCompleted() { System.out.println("Sequence complete."); } });
|
输出:
1 2 3 4 5 6
| Next: 1 Next: 3 Next: 6 Next: 10 Next: 15 Sequence complete.
|
类似于 buffer, 不同的是,window 在发送一个缓存区的时候是以 Observable 的形式发送的,即每发送一次都是一个完整的 Observable,会调用 onCompleted 方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| Observable.just(1,2,3,4,5,6).window(2).subscribe(new Observer<Observable<Integer>>() { @Override public void onCompleted() { System.out.println("onComleted"); }
@Override public void onError(Throwable e) {
}
@Override public void onNext(Observable<Integer> integerObservable) { integerObservable.subscribe(new Observer<Integer>() { @Override public void onCompleted() { System.out.println("completed"); }
@Override public void onError(Throwable e) { System.out.println("onError:" + e.getMessage()); }
@Override public void onNext(Integer integer) { System.out.println("onNext:" + integer); } }); } });
|
输出结果:
1 2 3 4 5 6 7 8 9 10
| onNext:1 onNext:2 completed onNext:3 onNext:4 completed onNext:5 onNext:6 completed onComleted
|