1 package au.gov.amsa.util.rx; 2 3 import rx.Observable.Operator; 4 import rx.Subscriber; 5 import rx.exceptions.OnErrorThrowable; 6 7 public class OperatorFlattenIterable<T> implements Operator<T, Iterable<T>> { 8 9 public static <T> Operator<T, Iterable<T>> flatten() { 10 return new OperatorFlattenIterable<T>(); 11 } 12 13 @Override 14 public Subscriber<? super Iterable<T>> call( 15 final Subscriber<? super T> subscriber) { 16 return new Subscriber<Iterable<T>>(subscriber) { 17 18 @Override 19 public void onCompleted() { 20 subscriber.onCompleted(); 21 } 22 23 @Override 24 public void onError(Throwable e) { 25 subscriber.onError(e); 26 } 27 28 @Override 29 public void onNext(Iterable<T> list) { 30 for (T t : list) { 31 if (subscriber.isUnsubscribed()) { 32 return; 33 } 34 try { 35 subscriber.onNext(t); 36 } catch (Exception e) { 37 onError(OnErrorThrowable.addValueAsLastCause(e, t)); 38 return; 39 } 40 } 41 } 42 }; 43 } 44 }