1 package au.gov.amsa.craft.analyzer.wms;
2
3 import static com.google.common.base.Optional.absent;
4
5 import java.util.LinkedList;
6 import java.util.List;
7
8 import rx.Observable.Operator;
9 import rx.Subscriber;
10 import au.gov.amsa.navigation.Identifier;
11 import au.gov.amsa.navigation.VesselPosition;
12
13 import com.github.davidmoten.grumpy.core.Position;
14 import com.google.common.base.Optional;
15
16 public class OperatorDriftDistanceCheck implements Operator<VesselPosition, VesselPosition> {
17
18 protected static final double MIN_DISTANCE_THRESHOLD_KM = 5 * 1.852;
19
20 @Override
21 public Subscriber<? super VesselPosition> call(final Subscriber<? super VesselPosition> child) {
22 return new Subscriber<VesselPosition>(child) {
23
24 Optional<Position> min = absent();
25 Optional<Position> max = absent();
26 Optional<Identifier> id = absent();
27 List<VesselPosition> buffer = new LinkedList<VesselPosition>();
28
29 @Override
30 public void onCompleted() {
31 buffer.clear();
32 child.onCompleted();
33 }
34
35 @Override
36 public void onError(Throwable e) {
37 buffer.clear();
38 child.onError(e);
39 }
40
41 @Override
42 public void onNext(VesselPosition vp) {
43 if (!id.isPresent()
44 || (id.isPresent() && vp.id().uniqueId() != id.get().uniqueId())
45 || vp.data().get().equals(vp.time())) {
46 min = Optional.of(Position.create(vp.lat(), vp.lon()));
47 max = Optional.of(Position.create(vp.lat(), vp.lon()));
48 id = Optional.of(vp.id());
49 buffer.clear();
50 } else {
51 min = Optional.of(min(min.get(), Position.create(vp.lat(), vp.lon())));
52 max = Optional.of(max(max.get(), Position.create(vp.lat(), vp.lon())));
53 }
54 buffer.add(vp);
55 if (distanceKm(min.get(), max.get()) >= MIN_DISTANCE_THRESHOLD_KM) {
56 for (VesselPosition p : buffer) {
57 child.onNext(p);
58 }
59 buffer.clear();
60 }
61 }
62
63 };
64 }
65
66 private double distanceKm(Position a, Position b) {
67 return a.getDistanceToKm(b);
68 }
69
70 private static Position min(Position a, Position b) {
71 return Position.create(Math.min(a.getLat(), b.getLat()), Math.min(a.getLon(), b.getLon()));
72 }
73
74 private static Position max(Position a, Position b) {
75 return Position.create(Math.max(a.getLat(), b.getLat()), Math.max(a.getLon(), b.getLon()));
76 }
77 }