1 package au.gov.amsa.util.rx;
2
3 import java.util.Map;
4 import java.util.Map.Entry;
5 import java.util.concurrent.atomic.AtomicBoolean;
6
7 import org.apache.log4j.Logger;
8
9 import rx.Observable.Operator;
10 import rx.Observer;
11 import rx.Subscriber;
12 import rx.Subscription;
13 import rx.functions.Func2;
14 import rx.observers.Subscribers;
15
16 public class OperatorMapEntries<A, B, C> implements Operator<C, Map<A, B>> {
17
18 private static Logger log = Logger.getLogger(OperatorMapEntries.class);
19 private final Func2<A, B, C> function;
20
21 public OperatorMapEntries(Func2<A, B, C> function) {
22 this.function = function;
23 }
24
25 @Override
26 public Subscriber<? super Map<A, B>> call(final Subscriber<? super C> child) {
27 final AtomicBoolean subscribed = new AtomicBoolean(true);
28 child.add(new Subscription() {
29 @Override
30 public void unsubscribe() {
31 subscribed.set(false);
32 }
33
34 @Override
35 public boolean isUnsubscribed() {
36 return !subscribed.get();
37 }
38 });
39 Subscriber<Map<A, B>> parent = Subscribers
40 .from(new Observer<Map<A, B>>() {
41
42 @Override
43 public void onCompleted() {
44 child.onCompleted();
45 }
46
47 @Override
48 public void onError(Throwable e) {
49 child.onError(e);
50 }
51
52 @Override
53 public void onNext(Map<A, B> map) {
54 log.info("emitting map with " + map.size() + " entries");
55 for (Entry<A, B> entry : map.entrySet()) {
56 if (subscribed.get()) {
57 child.onNext(function.call(entry.getKey(),
58 entry.getValue()));
59 } else
60 return;
61 }
62 }
63 });
64 child.add(parent);
65 return parent;
66 }
67 }