1 package au.gov.amsa.craft.analyzer.wms; 2 3 import static com.google.common.base.Optional.absent; 4 5 import java.util.LinkedList; 6 import java.util.List; 7 8 import rx.Observable.Operator; 9 import rx.Subscriber; 10 import au.gov.amsa.navigation.Identifier; 11 import au.gov.amsa.navigation.VesselPosition; 12 13 import com.github.davidmoten.grumpy.core.Position; 14 import com.google.common.base.Optional; 15 16 public class OperatorDriftDistanceCheck implements Operator<VesselPosition, VesselPosition> { 17 18 protected static final double MIN_DISTANCE_THRESHOLD_KM = 5 * 1.852; 19 20 @Override 21 public Subscriber<? super VesselPosition> call(final Subscriber<? super VesselPosition> child) { 22 return new Subscriber<VesselPosition>(child) { 23 24 Optional<Position> min = absent(); 25 Optional<Position> max = absent(); 26 Optional<Identifier> id = absent(); 27 List<VesselPosition> buffer = new LinkedList<VesselPosition>(); 28 29 @Override 30 public void onCompleted() { 31 buffer.clear(); 32 child.onCompleted(); 33 } 34 35 @Override 36 public void onError(Throwable e) { 37 buffer.clear(); 38 child.onError(e); 39 } 40 41 @Override 42 public void onNext(VesselPosition vp) { 43 if (!id.isPresent() 44 || (id.isPresent() && vp.id().uniqueId() != id.get().uniqueId()) 45 || vp.data().get().equals(vp.time())) { 46 min = Optional.of(Position.create(vp.lat(), vp.lon())); 47 max = Optional.of(Position.create(vp.lat(), vp.lon())); 48 id = Optional.of(vp.id()); 49 buffer.clear(); 50 } else { 51 min = Optional.of(min(min.get(), Position.create(vp.lat(), vp.lon()))); 52 max = Optional.of(max(max.get(), Position.create(vp.lat(), vp.lon()))); 53 } 54 buffer.add(vp); 55 if (distanceKm(min.get(), max.get()) >= MIN_DISTANCE_THRESHOLD_KM) { 56 for (VesselPosition p : buffer) { 57 child.onNext(p); 58 } 59 buffer.clear(); 60 } 61 } 62 63 }; 64 } 65 66 private double distanceKm(Position a, Position b) { 67 return a.getDistanceToKm(b); 68 } 69 70 private static Position min(Position a, Position b) { 71 return Position.create(Math.min(a.getLat(), b.getLat()), Math.min(a.getLon(), b.getLon())); 72 } 73 74 private static Position max(Position a, Position b) { 75 return Position.create(Math.max(a.getLat(), b.getLat()), Math.max(a.getLon(), b.getLon())); 76 } 77 }