View Javadoc
1   package au.gov.amsa.geo.distance;
2   
3   import static au.gov.amsa.geo.distance.EffectiveSpeedChecker.effectiveSpeedOk;
4   import static com.google.common.base.Optional.of;
5   import rx.Observable.Operator;
6   import rx.Subscriber;
7   import au.gov.amsa.geo.model.SegmentOptions;
8   import au.gov.amsa.risky.format.Fix;
9   
10  import com.google.common.base.Optional;
11  
12  /**
13   * Given a sequence of fixes the first fix is consider to be the first fix that
14   * passes an effective speed check with its following fix. Having established
15   * the first fix then following fixes are discarded from the sequence that fail
16   * the effective speed check with the last confirmed fix.
17   */
18  public class OperatorEffectiveSpeedChecker implements Operator<EffectiveSpeedCheck, Fix> {
19  
20      private final SegmentOptions options;
21  
22      public OperatorEffectiveSpeedChecker(SegmentOptions options) {
23          this.options = options;
24      }
25  
26      @Override
27      public Subscriber<? super Fix> call(Subscriber<? super EffectiveSpeedCheck> child) {
28          Subscriber<Fix> parent = createSubscriber(child, options);
29          return parent;
30  
31      }
32  
33      private static Subscriber<Fix> createSubscriber(
34              final Subscriber<? super EffectiveSpeedCheck> child, final SegmentOptions options) {
35  
36          return new Subscriber<Fix>(child) {
37  
38              /**
39               * The last emitted fix.
40               */
41              private Optional<Fix> previousFix = Optional.absent();
42  
43              /**
44               * The latest fix.
45               */
46              private Optional<Fix> first = Optional.absent();
47  
48              @Override
49              public void onCompleted() {
50                  child.onCompleted();
51              }
52  
53              @Override
54              public void onError(Throwable e) {
55                  child.onError(e);
56              }
57  
58              @Override
59              public void onNext(Fix fix) {
60  
61                  if (!previousFix.isPresent()) {
62                      if (!first.isPresent()) {
63                          // buffer the very first fix. It will get emitted only
64                          // if passes effective speed check with the following
65                          // fix. If it does not then the next fix will be
66                          // considered as the next candidate for being the first
67                          // fix.
68                          first = of(fix);
69                          // because no emission we request again to honour
70                          // backpressure
71                          request(1);
72                      } else if (effectiveSpeedOk(first.get(), fix, options)) {
73                          previousFix = of(fix);
74                          child.onNext(new EffectiveSpeedCheck(first.get(), true));
75                          child.onNext(new EffectiveSpeedCheck(fix, true));
76                      } else {
77                          first = of(fix);
78                          // because no emission we request again to honour
79                          // backpressure
80                          request(1);
81                      }
82                  } else if (effectiveSpeedOk(previousFix.get(), fix, options)) {
83                      previousFix = of(fix);
84                      child.onNext(new EffectiveSpeedCheck(fix, true));
85                  } else {
86                      // failed effective speed check
87                      child.onNext(new EffectiveSpeedCheck(fix, false));
88                  }
89              }
90          };
91      }
92  
93  }