View Javadoc
1   package au.gov.amsa.craft.analyzer.wms;
2   
3   import rx.Observable.Operator;
4   import rx.Producer;
5   import rx.Subscriber;
6   
7   public class OperatorBackpressureChecker<T> implements Operator<T, T> {
8   
9       @Override
10      public Subscriber<? super T> call(Subscriber<? super T> child) {
11          final ParentSubscriber<T> parent = new ParentSubscriber<T>(child);
12  
13          child.setProducer(new Producer() {
14              @Override
15              public void request(long n) {
16                  parent.addRequest(n);
17                  parent.requestMore(n);
18              }
19          });
20  
21          return parent;
22      }
23  
24      private final static class ParentSubscriber<T> extends Subscriber<T> {
25          private final Subscriber<? super T> child;
26          private long countEmitted = 0;
27          private long countRequests = 0;
28  
29          private ParentSubscriber(Subscriber<? super T> child) {
30              this.child = child;
31          }
32  
33          public void addRequest(long n) {
34              countRequests += n;
35          }
36  
37          private void requestMore(long n) {
38              request(n);
39          }
40  
41          @Override
42          public void onCompleted() {
43              child.onCompleted();
44          }
45  
46          @Override
47          public void onError(Throwable e) {
48              child.onError(e);
49          }
50  
51          @Override
52          public void onNext(T t) {
53              countEmitted++;
54              if (countEmitted > countRequests) {
55                  child.onError(new RuntimeException("emissions exceeded requested amount!"));
56                  //eagerly unsubscribe
57                  unsubscribe();
58              }
59              else
60                  child.onNext(t);
61          }
62  
63      }
64  
65  }