View Javadoc
1   package au.gov.amsa.util.rx;
2   
3   import java.util.Map;
4   import java.util.Map.Entry;
5   import java.util.concurrent.atomic.AtomicBoolean;
6   
7   import org.apache.log4j.Logger;
8   
9   import rx.Observable.Operator;
10  import rx.Observer;
11  import rx.Subscriber;
12  import rx.Subscription;
13  import rx.functions.Func2;
14  import rx.observers.Subscribers;
15  
16  public class OperatorMapEntries<A, B, C> implements Operator<C, Map<A, B>> {
17  
18  	private static Logger log = Logger.getLogger(OperatorMapEntries.class);
19  	private final Func2<A, B, C> function;
20  
21  	public OperatorMapEntries(Func2<A, B, C> function) {
22  		this.function = function;
23  	}
24  
25  	@Override
26  	public Subscriber<? super Map<A, B>> call(final Subscriber<? super C> child) {
27  		final AtomicBoolean subscribed = new AtomicBoolean(true);
28  		child.add(new Subscription() {
29  			@Override
30  			public void unsubscribe() {
31  				subscribed.set(false);
32  			}
33  
34  			@Override
35  			public boolean isUnsubscribed() {
36  				return !subscribed.get();
37  			}
38  		});
39  		Subscriber<Map<A, B>> parent = Subscribers
40  				.from(new Observer<Map<A, B>>() {
41  
42  					@Override
43  					public void onCompleted() {
44  						child.onCompleted();
45  					}
46  
47  					@Override
48  					public void onError(Throwable e) {
49  						child.onError(e);
50  					}
51  
52  					@Override
53  					public void onNext(Map<A, B> map) {
54  						log.info("emitting map with " + map.size() + " entries");
55  						for (Entry<A, B> entry : map.entrySet()) {
56  							if (subscribed.get()) {
57  								child.onNext(function.call(entry.getKey(),
58  										entry.getValue()));
59  							} else
60  								return;
61  						}
62  					}
63  				});
64  		child.add(parent);
65  		return parent;
66  	}
67  }