1 package au.gov.amsa.util.rx;
2
3 import rx.Observable.Operator;
4 import rx.Subscriber;
5 import rx.exceptions.OnErrorThrowable;
6
7 public class OperatorFlattenIterable<T> implements Operator<T, Iterable<T>> {
8
9 public static <T> Operator<T, Iterable<T>> flatten() {
10 return new OperatorFlattenIterable<T>();
11 }
12
13 @Override
14 public Subscriber<? super Iterable<T>> call(
15 final Subscriber<? super T> subscriber) {
16 return new Subscriber<Iterable<T>>(subscriber) {
17
18 @Override
19 public void onCompleted() {
20 subscriber.onCompleted();
21 }
22
23 @Override
24 public void onError(Throwable e) {
25 subscriber.onError(e);
26 }
27
28 @Override
29 public void onNext(Iterable<T> list) {
30 for (T t : list) {
31 if (subscriber.isUnsubscribed()) {
32 return;
33 }
34 try {
35 subscriber.onNext(t);
36 } catch (Exception e) {
37 onError(OnErrorThrowable.addValueAsLastCause(e, t));
38 return;
39 }
40 }
41 }
42 };
43 }
44 }