View Javadoc
1   package au.gov.amsa.navigation;
2   
3   import java.util.concurrent.TimeUnit;
4   
5   import com.github.davidmoten.rx.StateMachine.Transition;
6   import com.github.davidmoten.rx.Transformers;
7   import com.google.common.annotations.VisibleForTesting;
8   import com.google.common.base.Preconditions;
9   
10  import au.gov.amsa.risky.format.Fix;
11  import au.gov.amsa.risky.format.HasFix;
12  import au.gov.amsa.risky.format.NavigationalStatus;
13  import rx.Observable;
14  import rx.Observable.Transformer;
15  import rx.Subscriber;
16  import rx.functions.Func1;
17  
18  public class DriftDetector {
19  
20      public static Observable<DriftCandidate> getCandidates(Observable<HasFix> o, Options options) {
21          return o.compose(new DriftDetectorTransformer(options));
22      }
23  
24      public static DriftDetectorTransformer detectDrift() {
25          return new DriftDetectorTransformer(Options.instance());
26      }
27  
28      public static DriftDetectorTransformer detectDrift(Options options) {
29          return new DriftDetectorTransformer(options);
30      }
31  
32      public static class DriftDetectorTransformer implements Transformer<HasFix, DriftCandidate> {
33  
34          // because many of these are expected to be in existence simultaneously
35          // (one per vessel and between 30,000 and 40,000 vessels may appear in
36          // our coverage annually, we need to be nice and careful with how much
37          // memory this operator uses.
38  
39          private static final long NOT_DRIFTING = Long.MAX_VALUE;
40          private static final long MMSI_NOT_SET = 0;
41  
42          private final Options options;
43          private final Func1<Fix, Boolean> isCandidate;
44  
45          public DriftDetectorTransformer(Options options) {
46              this.options = options;
47              this.isCandidate = isCandidate(options);
48          }
49  
50          @Override
51          public Observable<DriftCandidate> call(Observable<HasFix> o) {
52              Transformer<HasFix, DriftCandidate> t = Transformers.stateMachine()
53                      .initialState(new State(options)) //
54                      .transition(new Transition<State, HasFix, DriftCandidate>() {
55  
56                          @Override
57                          public State call(State state, HasFix value,
58                                  Subscriber<DriftCandidate> subscriber) {
59                              state.onNext(value, subscriber, isCandidate);
60                              return state;
61                          }
62                      }) //
63                      .build();
64              return o.compose(t);
65          }
66  
67          // mutable class but is mutated serially (google Observable contract)
68          private static final class State {
69              Item a;
70              Item b;
71              long driftingSince = NOT_DRIFTING;
72              long mmsi = 0;
73              private final Options options;
74  
75              public State(Options options) {
76                  this.options = options;
77              }
78  
79              public void onNext(HasFix f, Subscriber<DriftCandidate> subscriber,
80                      Func1<Fix, Boolean> isCandidate) {
81                  try {
82                      // Note that it is assumed that the input stream is grouped
83                      // by
84                      // mmsi and sorted by ascending time.
85                      Fix fix = f.fix();
86  
87                      if (mmsi != MMSI_NOT_SET && fix.mmsi() != mmsi) {
88                          // reset for a new vessel
89                          a = null;
90                          b = null;
91                          driftingSince = NOT_DRIFTING;
92                      }
93                      mmsi = fix.mmsi();
94  
95                      if (outOfTimeOrder(fix)) {
96                          return;
97                      }
98  
99                      final Item item;
100                     if (isCandidate.call(fix)) {
101                         item = new Drifter(f, false);
102                     } else
103                         item = new NonDrifter(fix.time());
104                     if (a == null) {
105                         a = item;
106                         processAB(subscriber);
107                     } else if (b == null) {
108                         b = item;
109                         processAB(subscriber);
110                     } else {
111                         processABC(item, subscriber);
112                     }
113                 } catch (RuntimeException e) {
114                     subscriber.onError(e);
115                 }
116             }
117 
118             private boolean outOfTimeOrder(Fix fix) {
119                 if (b != null && fix.time() < b.time())
120                     return true;
121                 else if (a != null && fix.time() < a.time())
122                     return true;
123                 else
124                     return false;
125             }
126 
127             private void processABC(Item c, Subscriber<DriftCandidate> subscriber) {
128                 if (isDrifter(a) && !isDrifter(b) && !isDrifter(c)) {
129                     // ignore c
130                     // rule 4, 5
131                 } else if (isDrifter(a) && !isDrifter(b) && isDrifter(c)) {
132                     // rule 6, 7
133                     if (withinNonDriftingThreshold(b, c)) {
134                         b = c;
135                         processAB(subscriber);
136                     } else {
137                         a = c;
138                         b = null;
139                     }
140                 } else {
141                     System.out.println(a + "," + b + "," + c);
142                     unexpected();
143                 }
144             }
145 
146             private void unexpected() {
147                 throw new RuntimeException("unexpected");
148             }
149 
150             private void processAB(Subscriber<DriftCandidate> subscriber) {
151                 if (!isDrifter(a)) {
152                     // rule 1
153                     a = null;
154                     if (b != null)
155                         unexpected();
156                 } else if (b == null) {
157                     // do nothing
158                 } else if (!a.emitted()) {
159                     if (isDrifter(b)) {
160                         // rule 2
161                         if (!expired(a, b)) {
162                             driftingSince = a.time();
163                             subscriber.onNext(new DriftCandidate(a.fix(), a.time()));
164                             subscriber.onNext(new DriftCandidate(b.fix(), a.time()));
165                             // mark as emitted
166                             a = new Drifter(a.fix(), true);
167                             b = null;
168                         } else {
169                             a = b;
170                             b = null;
171                         }
172                     }
173                 } else {
174                     // a has been emitted
175                     // rule 3
176                     if (isDrifter(b)) {
177                         if (!expired(a, b)) {
178                             subscriber.onNext(new DriftCandidate(b.fix(), driftingSince));
179                             a = new Drifter(b.fix(), true);
180                             b = null;
181                         } else {
182                             a = b;
183                             b = null;
184                         }
185                     }
186                 }
187             }
188 
189             private boolean expired(Item a, Item b) {
190                 return b.time() - a.time() >= options.expiryAgeMs();
191             }
192 
193             private boolean withinNonDriftingThreshold(Item a, Item b) {
194                 return b.time() - a.time() < options.nonDriftingThresholdMs();
195             }
196 
197         }
198 
199         private static boolean isDrifter(Item item) {
200             return item instanceof Drifter;
201         }
202 
203         private static interface Item {
204             long time();
205 
206             HasFix fix();
207 
208             boolean emitted();
209         }
210 
211         private static class Drifter implements Item {
212 
213             private final HasFix fix;
214             private final boolean emitted;
215 
216             Drifter(HasFix fix, boolean emitted) {
217                 this.fix = fix;
218                 this.emitted = emitted;
219             }
220 
221             @Override
222             public long time() {
223                 return fix.fix().time();
224             }
225 
226             @Override
227             public HasFix fix() {
228                 return fix;
229             }
230 
231             @Override
232             public boolean emitted() {
233                 return emitted;
234             }
235 
236         }
237 
238         private static class NonDrifter implements Item {
239 
240             private final long time;
241 
242             NonDrifter(long time) {
243                 this.time = time;
244             }
245 
246             @Override
247             public long time() {
248                 return time;
249             }
250 
251             @Override
252             public Fix fix() {
253                 throw new RuntimeException("unexpected");
254             }
255 
256             @Override
257             public boolean emitted() {
258                 // never gets emitted
259                 return false;
260             }
261 
262         }
263 
264     }
265 
266     @VisibleForTesting
267     static Func1<Fix, Boolean> isCandidate(Options options) {
268         return f -> {
269             if (f.courseOverGroundDegrees().isPresent() && f.headingDegrees().isPresent()
270                     && f.speedOverGroundKnots().isPresent()
271                     && (!f.navigationalStatus().isPresent()
272                             || (f.navigationalStatus().get() != NavigationalStatus.AT_ANCHOR && f
273                                     .navigationalStatus().get() != NavigationalStatus.MOORED))) {
274                 double diff = diff(f.courseOverGroundDegrees().get(), f.headingDegrees().get());
275                 return diff >= options.minHeadingCogDifference()
276                         && diff <= options.maxHeadingCogDifference()
277                         && f.speedOverGroundKnots().get() <= options.maxDriftingSpeedKnots()
278                         && f.speedOverGroundKnots().get() > options.minDriftingSpeedKnots();
279             } else
280                 return false;
281         };
282     }
283 
284     static double diff(double a, double b) {
285         Preconditions.checkArgument(a >= 0 && a < 360);
286         Preconditions.checkArgument(b >= 0 && b < 360);
287         double value;
288         if (a < b)
289             value = a + 360 - b;
290         else
291             value = a - b;
292         if (value > 180)
293             return 360 - value;
294         else
295             return value;
296 
297     };
298 
299     public static final class Options {
300 
301         @VisibleForTesting
302         static final int DEFAULT_HEADING_COG_DIFFERENCE_MIN = 45;
303         @VisibleForTesting
304         static final int DEFAULT_HEADING_COG_DIFFERENCE_MAX = 135;
305         @VisibleForTesting
306         static final float DEFAULT_MIN_DRIFTING_SPEED_KNOTS = 0.25f;
307         @VisibleForTesting
308         static final float DEFAULT_MAX_DRIFTING_SPEED_KNOTS = 20;
309         private static final long DEFAULT_EXPIRY_AGE_MS = TimeUnit.HOURS.toMillis(6);
310         private static final long DEFAULT_NON_DRIFTING_THRESHOLD_MS = TimeUnit.MINUTES.toMillis(5);
311 
312         private final int minHeadingCogDifference;
313         private final int maxHeadingCogDifference;
314         private final float minDriftingSpeedKnots;
315         private final float maxDriftingSpeedKnots;
316         private final long expiryAgeMs;
317         private final long nonDriftingThresholdMs;
318 
319         private static class Holder {
320 
321             static Options INSTANCE = new Options(DEFAULT_HEADING_COG_DIFFERENCE_MIN,
322                     DEFAULT_HEADING_COG_DIFFERENCE_MAX, DEFAULT_MIN_DRIFTING_SPEED_KNOTS,
323                     DEFAULT_MAX_DRIFTING_SPEED_KNOTS, DEFAULT_EXPIRY_AGE_MS,
324                     DEFAULT_NON_DRIFTING_THRESHOLD_MS);
325         }
326 
327         public static Options instance() {
328             return Holder.INSTANCE;
329         }
330 
331         public Options(int minHeadingCogDifference, int maxHeadingCogDifference,
332                 float minDriftingSpeedKnots, float maxDriftingSpeedKnots, long expiryAgeMs,
333                 long nonDriftingThresholdMs) {
334             Preconditions.checkArgument(minHeadingCogDifference >= 0);
335             Preconditions.checkArgument(minDriftingSpeedKnots >= 0);
336             Preconditions.checkArgument(minHeadingCogDifference <= maxHeadingCogDifference);
337             Preconditions.checkArgument(minDriftingSpeedKnots <= maxDriftingSpeedKnots);
338             Preconditions.checkArgument(expiryAgeMs > 0);
339             Preconditions.checkArgument(nonDriftingThresholdMs >= 0);
340             this.minHeadingCogDifference = minHeadingCogDifference;
341             this.maxHeadingCogDifference = maxHeadingCogDifference;
342             this.minDriftingSpeedKnots = minDriftingSpeedKnots;
343             this.maxDriftingSpeedKnots = maxDriftingSpeedKnots;
344             this.expiryAgeMs = expiryAgeMs;
345             this.nonDriftingThresholdMs = nonDriftingThresholdMs;
346         }
347 
348         public int maxHeadingCogDifference() {
349             return maxHeadingCogDifference;
350         }
351 
352         public int minHeadingCogDifference() {
353             return minHeadingCogDifference;
354         }
355 
356         public float maxDriftingSpeedKnots() {
357             return maxDriftingSpeedKnots;
358         }
359 
360         public float minDriftingSpeedKnots() {
361             return minDriftingSpeedKnots;
362         }
363 
364         public long expiryAgeMs() {
365             return expiryAgeMs;
366         }
367 
368         public long nonDriftingThresholdMs() {
369             return nonDriftingThresholdMs;
370         }
371 
372         @Override
373         public String toString() {
374             StringBuilder b = new StringBuilder();
375             b.append("Options [minHeadingCogDifference=");
376             b.append(minHeadingCogDifference);
377             b.append(", maxHeadingCogDifference=");
378             b.append(maxHeadingCogDifference);
379             b.append(", minDriftingSpeedKnots=");
380             b.append(minDriftingSpeedKnots);
381             b.append(", maxDriftingSpeedKnots=");
382             b.append(maxDriftingSpeedKnots);
383             b.append(", expiryAgeMs=");
384             b.append(expiryAgeMs);
385             b.append(", nonDriftingThresholdMs=");
386             b.append(nonDriftingThresholdMs);
387             b.append("]");
388             return b.toString();
389         }
390 
391     }
392 
393 }