1 package au.gov.amsa.util.rx; 2 3 import java.util.Map; 4 import java.util.Map.Entry; 5 import java.util.concurrent.atomic.AtomicBoolean; 6 7 import org.apache.log4j.Logger; 8 9 import rx.Observable.Operator; 10 import rx.Observer; 11 import rx.Subscriber; 12 import rx.Subscription; 13 import rx.functions.Func2; 14 import rx.observers.Subscribers; 15 16 public class OperatorMapEntries<A, B, C> implements Operator<C, Map<A, B>> { 17 18 private static Logger log = Logger.getLogger(OperatorMapEntries.class); 19 private final Func2<A, B, C> function; 20 21 public OperatorMapEntries(Func2<A, B, C> function) { 22 this.function = function; 23 } 24 25 @Override 26 public Subscriber<? super Map<A, B>> call(final Subscriber<? super C> child) { 27 final AtomicBoolean subscribed = new AtomicBoolean(true); 28 child.add(new Subscription() { 29 @Override 30 public void unsubscribe() { 31 subscribed.set(false); 32 } 33 34 @Override 35 public boolean isUnsubscribed() { 36 return !subscribed.get(); 37 } 38 }); 39 Subscriber<Map<A, B>> parent = Subscribers 40 .from(new Observer<Map<A, B>>() { 41 42 @Override 43 public void onCompleted() { 44 child.onCompleted(); 45 } 46 47 @Override 48 public void onError(Throwable e) { 49 child.onError(e); 50 } 51 52 @Override 53 public void onNext(Map<A, B> map) { 54 log.info("emitting map with " + map.size() + " entries"); 55 for (Entry<A, B> entry : map.entrySet()) { 56 if (subscribed.get()) { 57 child.onNext(function.call(entry.getKey(), 58 entry.getValue())); 59 } else 60 return; 61 } 62 } 63 }); 64 child.add(parent); 65 return parent; 66 } 67 }