1 package au.gov.amsa.craft.analyzer.wms;
2
3 import rx.Observable.Operator;
4 import rx.Producer;
5 import rx.Subscriber;
6
7 public class OperatorBackpressureChecker<T> implements Operator<T, T> {
8
9 @Override
10 public Subscriber<? super T> call(Subscriber<? super T> child) {
11 final ParentSubscriber<T> parent = new ParentSubscriber<T>(child);
12
13 child.setProducer(new Producer() {
14 @Override
15 public void request(long n) {
16 parent.addRequest(n);
17 parent.requestMore(n);
18 }
19 });
20
21 return parent;
22 }
23
24 private final static class ParentSubscriber<T> extends Subscriber<T> {
25 private final Subscriber<? super T> child;
26 private long countEmitted = 0;
27 private long countRequests = 0;
28
29 private ParentSubscriber(Subscriber<? super T> child) {
30 this.child = child;
31 }
32
33 public void addRequest(long n) {
34 countRequests += n;
35 }
36
37 private void requestMore(long n) {
38 request(n);
39 }
40
41 @Override
42 public void onCompleted() {
43 child.onCompleted();
44 }
45
46 @Override
47 public void onError(Throwable e) {
48 child.onError(e);
49 }
50
51 @Override
52 public void onNext(T t) {
53 countEmitted++;
54 if (countEmitted > countRequests) {
55 child.onError(new RuntimeException("emissions exceeded requested amount!"));
56
57 unsubscribe();
58 }
59 else
60 child.onNext(t);
61 }
62
63 }
64
65 }