View Javadoc
1   package au.gov.amsa.util.rx;
2   
3   import rx.Observable.Operator;
4   import rx.Subscriber;
5   import rx.exceptions.OnErrorThrowable;
6   
7   public class OperatorFlattenIterable<T> implements Operator<T, Iterable<T>> {
8   
9   	public static <T> Operator<T, Iterable<T>> flatten() {
10  		return new OperatorFlattenIterable<T>();
11  	}
12  
13  	@Override
14  	public Subscriber<? super Iterable<T>> call(
15  			final Subscriber<? super T> subscriber) {
16  		return new Subscriber<Iterable<T>>(subscriber) {
17  
18  			@Override
19  			public void onCompleted() {
20  				subscriber.onCompleted();
21  			}
22  
23  			@Override
24  			public void onError(Throwable e) {
25  				subscriber.onError(e);
26  			}
27  
28  			@Override
29  			public void onNext(Iterable<T> list) {
30  				for (T t : list) {
31  					if (subscriber.isUnsubscribed()) {
32  						return;
33  					}
34  					try {
35  						subscriber.onNext(t);
36  					} catch (Exception e) {
37  						onError(OnErrorThrowable.addValueAsLastCause(e, t));
38  						return;
39  					}
40  				}
41  			}
42  		};
43  	}
44  }