1 package au.gov.amsa.geo.distance;
2
3 import static au.gov.amsa.geo.distance.EffectiveSpeedChecker.effectiveSpeedOk;
4 import static com.google.common.base.Optional.of;
5 import rx.Observable.Operator;
6 import rx.Subscriber;
7 import au.gov.amsa.geo.model.SegmentOptions;
8 import au.gov.amsa.risky.format.Fix;
9
10 import com.google.common.base.Optional;
11
12
13
14
15
16
17
18 public class OperatorEffectiveSpeedChecker implements Operator<EffectiveSpeedCheck, Fix> {
19
20 private final SegmentOptions options;
21
22 public OperatorEffectiveSpeedChecker(SegmentOptions options) {
23 this.options = options;
24 }
25
26 @Override
27 public Subscriber<? super Fix> call(Subscriber<? super EffectiveSpeedCheck> child) {
28 Subscriber<Fix> parent = createSubscriber(child, options);
29 return parent;
30
31 }
32
33 private static Subscriber<Fix> createSubscriber(
34 final Subscriber<? super EffectiveSpeedCheck> child, final SegmentOptions options) {
35
36 return new Subscriber<Fix>(child) {
37
38
39
40
41 private Optional<Fix> previousFix = Optional.absent();
42
43
44
45
46 private Optional<Fix> first = Optional.absent();
47
48 @Override
49 public void onCompleted() {
50 child.onCompleted();
51 }
52
53 @Override
54 public void onError(Throwable e) {
55 child.onError(e);
56 }
57
58 @Override
59 public void onNext(Fix fix) {
60
61 if (!previousFix.isPresent()) {
62 if (!first.isPresent()) {
63
64
65
66
67
68 first = of(fix);
69
70
71 request(1);
72 } else if (effectiveSpeedOk(first.get(), fix, options)) {
73 previousFix = of(fix);
74 child.onNext(new EffectiveSpeedCheck(first.get(), true));
75 child.onNext(new EffectiveSpeedCheck(fix, true));
76 } else {
77 first = of(fix);
78
79
80 request(1);
81 }
82 } else if (effectiveSpeedOk(previousFix.get(), fix, options)) {
83 previousFix = of(fix);
84 child.onNext(new EffectiveSpeedCheck(fix, true));
85 } else {
86
87 child.onNext(new EffectiveSpeedCheck(fix, false));
88 }
89 }
90 };
91 }
92
93 }