View Javadoc
1   package au.gov.amsa.craft.analyzer.wms;
2   
3   import static com.google.common.base.Optional.absent;
4   
5   import java.util.LinkedList;
6   import java.util.List;
7   
8   import rx.Observable.Operator;
9   import rx.Subscriber;
10  import au.gov.amsa.navigation.Identifier;
11  import au.gov.amsa.navigation.VesselPosition;
12  
13  import com.github.davidmoten.grumpy.core.Position;
14  import com.google.common.base.Optional;
15  
16  public class OperatorDriftDistanceCheck implements Operator<VesselPosition, VesselPosition> {
17  
18      protected static final double MIN_DISTANCE_THRESHOLD_KM = 5 * 1.852;
19  
20      @Override
21      public Subscriber<? super VesselPosition> call(final Subscriber<? super VesselPosition> child) {
22          return new Subscriber<VesselPosition>(child) {
23  
24              Optional<Position> min = absent();
25              Optional<Position> max = absent();
26              Optional<Identifier> id = absent();
27              List<VesselPosition> buffer = new LinkedList<VesselPosition>();
28  
29              @Override
30              public void onCompleted() {
31                  buffer.clear();
32                  child.onCompleted();
33              }
34  
35              @Override
36              public void onError(Throwable e) {
37                  buffer.clear();
38                  child.onError(e);
39              }
40  
41              @Override
42              public void onNext(VesselPosition vp) {
43                  if (!id.isPresent()
44                          || (id.isPresent() && vp.id().uniqueId() != id.get().uniqueId())
45                          || vp.data().get().equals(vp.time())) {
46                      min = Optional.of(Position.create(vp.lat(), vp.lon()));
47                      max = Optional.of(Position.create(vp.lat(), vp.lon()));
48                      id = Optional.of(vp.id());
49                      buffer.clear();
50                  } else {
51                      min = Optional.of(min(min.get(), Position.create(vp.lat(), vp.lon())));
52                      max = Optional.of(max(max.get(), Position.create(vp.lat(), vp.lon())));
53                  }
54                  buffer.add(vp);
55                  if (distanceKm(min.get(), max.get()) >= MIN_DISTANCE_THRESHOLD_KM) {
56                      for (VesselPosition p : buffer) {
57                          child.onNext(p);
58                      }
59                      buffer.clear();
60                  }
61              }
62  
63          };
64      }
65  
66      private double distanceKm(Position a, Position b) {
67          return a.getDistanceToKm(b);
68      }
69  
70      private static Position min(Position a, Position b) {
71          return Position.create(Math.min(a.getLat(), b.getLat()), Math.min(a.getLon(), b.getLon()));
72      }
73  
74      private static Position max(Position a, Position b) {
75          return Position.create(Math.max(a.getLat(), b.getLat()), Math.max(a.getLon(), b.getLon()));
76      }
77  }