1 package au.gov.amsa.navigation;
2
3 import java.util.concurrent.TimeUnit;
4
5 import com.github.davidmoten.rx.StateMachine.Transition;
6 import com.github.davidmoten.rx.Transformers;
7 import com.google.common.annotations.VisibleForTesting;
8 import com.google.common.base.Preconditions;
9
10 import au.gov.amsa.risky.format.Fix;
11 import au.gov.amsa.risky.format.HasFix;
12 import au.gov.amsa.risky.format.NavigationalStatus;
13 import rx.Observable;
14 import rx.Observable.Transformer;
15 import rx.Subscriber;
16 import rx.functions.Func1;
17
18 public class DriftDetector {
19
20 public static Observable<DriftCandidate> getCandidates(Observable<HasFix> o, Options options) {
21 return o.compose(new DriftDetectorTransformer(options));
22 }
23
24 public static DriftDetectorTransformer detectDrift() {
25 return new DriftDetectorTransformer(Options.instance());
26 }
27
28 public static DriftDetectorTransformer detectDrift(Options options) {
29 return new DriftDetectorTransformer(options);
30 }
31
32 public static class DriftDetectorTransformer implements Transformer<HasFix, DriftCandidate> {
33
34
35
36
37
38
39 private static final long NOT_DRIFTING = Long.MAX_VALUE;
40 private static final long MMSI_NOT_SET = 0;
41
42 private final Options options;
43 private final Func1<Fix, Boolean> isCandidate;
44
45 public DriftDetectorTransformer(Options options) {
46 this.options = options;
47 this.isCandidate = isCandidate(options);
48 }
49
50 @Override
51 public Observable<DriftCandidate> call(Observable<HasFix> o) {
52 Transformer<HasFix, DriftCandidate> t = Transformers.stateMachine()
53 .initialState(new State(options))
54 .transition(new Transition<State, HasFix, DriftCandidate>() {
55
56 @Override
57 public State call(State state, HasFix value,
58 Subscriber<DriftCandidate> subscriber) {
59 state.onNext(value, subscriber, isCandidate);
60 return state;
61 }
62 })
63 .build();
64 return o.compose(t);
65 }
66
67
68 private static final class State {
69 Item a;
70 Item b;
71 long driftingSince = NOT_DRIFTING;
72 long mmsi = 0;
73 private final Options options;
74
75 public State(Options options) {
76 this.options = options;
77 }
78
79 public void onNext(HasFix f, Subscriber<DriftCandidate> subscriber,
80 Func1<Fix, Boolean> isCandidate) {
81 try {
82
83
84
85 Fix fix = f.fix();
86
87 if (mmsi != MMSI_NOT_SET && fix.mmsi() != mmsi) {
88
89 a = null;
90 b = null;
91 driftingSince = NOT_DRIFTING;
92 }
93 mmsi = fix.mmsi();
94
95 if (outOfTimeOrder(fix)) {
96 return;
97 }
98
99 final Item item;
100 if (isCandidate.call(fix)) {
101 item = new Drifter(f, false);
102 } else
103 item = new NonDrifter(fix.time());
104 if (a == null) {
105 a = item;
106 processAB(subscriber);
107 } else if (b == null) {
108 b = item;
109 processAB(subscriber);
110 } else {
111 processABC(item, subscriber);
112 }
113 } catch (RuntimeException e) {
114 subscriber.onError(e);
115 }
116 }
117
118 private boolean outOfTimeOrder(Fix fix) {
119 if (b != null && fix.time() < b.time())
120 return true;
121 else if (a != null && fix.time() < a.time())
122 return true;
123 else
124 return false;
125 }
126
127 private void processABC(Item c, Subscriber<DriftCandidate> subscriber) {
128 if (isDrifter(a) && !isDrifter(b) && !isDrifter(c)) {
129
130
131 } else if (isDrifter(a) && !isDrifter(b) && isDrifter(c)) {
132
133 if (withinNonDriftingThreshold(b, c)) {
134 b = c;
135 processAB(subscriber);
136 } else {
137 a = c;
138 b = null;
139 }
140 } else {
141 System.out.println(a + "," + b + "," + c);
142 unexpected();
143 }
144 }
145
146 private void unexpected() {
147 throw new RuntimeException("unexpected");
148 }
149
150 private void processAB(Subscriber<DriftCandidate> subscriber) {
151 if (!isDrifter(a)) {
152
153 a = null;
154 if (b != null)
155 unexpected();
156 } else if (b == null) {
157
158 } else if (!a.emitted()) {
159 if (isDrifter(b)) {
160
161 if (!expired(a, b)) {
162 driftingSince = a.time();
163 subscriber.onNext(new DriftCandidate(a.fix(), a.time()));
164 subscriber.onNext(new DriftCandidate(b.fix(), a.time()));
165
166 a = new Drifter(a.fix(), true);
167 b = null;
168 } else {
169 a = b;
170 b = null;
171 }
172 }
173 } else {
174
175
176 if (isDrifter(b)) {
177 if (!expired(a, b)) {
178 subscriber.onNext(new DriftCandidate(b.fix(), driftingSince));
179 a = new Drifter(b.fix(), true);
180 b = null;
181 } else {
182 a = b;
183 b = null;
184 }
185 }
186 }
187 }
188
189 private boolean expired(Item a, Item b) {
190 return b.time() - a.time() >= options.expiryAgeMs();
191 }
192
193 private boolean withinNonDriftingThreshold(Item a, Item b) {
194 return b.time() - a.time() < options.nonDriftingThresholdMs();
195 }
196
197 }
198
199 private static boolean isDrifter(Item item) {
200 return item instanceof Drifter;
201 }
202
203 private static interface Item {
204 long time();
205
206 HasFix fix();
207
208 boolean emitted();
209 }
210
211 private static class Drifter implements Item {
212
213 private final HasFix fix;
214 private final boolean emitted;
215
216 Drifter(HasFix fix, boolean emitted) {
217 this.fix = fix;
218 this.emitted = emitted;
219 }
220
221 @Override
222 public long time() {
223 return fix.fix().time();
224 }
225
226 @Override
227 public HasFix fix() {
228 return fix;
229 }
230
231 @Override
232 public boolean emitted() {
233 return emitted;
234 }
235
236 }
237
238 private static class NonDrifter implements Item {
239
240 private final long time;
241
242 NonDrifter(long time) {
243 this.time = time;
244 }
245
246 @Override
247 public long time() {
248 return time;
249 }
250
251 @Override
252 public Fix fix() {
253 throw new RuntimeException("unexpected");
254 }
255
256 @Override
257 public boolean emitted() {
258
259 return false;
260 }
261
262 }
263
264 }
265
266 @VisibleForTesting
267 static Func1<Fix, Boolean> isCandidate(Options options) {
268 return f -> {
269 if (f.courseOverGroundDegrees().isPresent() && f.headingDegrees().isPresent()
270 && f.speedOverGroundKnots().isPresent()
271 && (!f.navigationalStatus().isPresent()
272 || (f.navigationalStatus().get() != NavigationalStatus.AT_ANCHOR && f
273 .navigationalStatus().get() != NavigationalStatus.MOORED))) {
274 double diff = diff(f.courseOverGroundDegrees().get(), f.headingDegrees().get());
275 return diff >= options.minHeadingCogDifference()
276 && diff <= options.maxHeadingCogDifference()
277 && f.speedOverGroundKnots().get() <= options.maxDriftingSpeedKnots()
278 && f.speedOverGroundKnots().get() > options.minDriftingSpeedKnots();
279 } else
280 return false;
281 };
282 }
283
284 static double diff(double a, double b) {
285 Preconditions.checkArgument(a >= 0 && a < 360);
286 Preconditions.checkArgument(b >= 0 && b < 360);
287 double value;
288 if (a < b)
289 value = a + 360 - b;
290 else
291 value = a - b;
292 if (value > 180)
293 return 360 - value;
294 else
295 return value;
296
297 };
298
299 public static final class Options {
300
301 @VisibleForTesting
302 static final int DEFAULT_HEADING_COG_DIFFERENCE_MIN = 45;
303 @VisibleForTesting
304 static final int DEFAULT_HEADING_COG_DIFFERENCE_MAX = 135;
305 @VisibleForTesting
306 static final float DEFAULT_MIN_DRIFTING_SPEED_KNOTS = 0.25f;
307 @VisibleForTesting
308 static final float DEFAULT_MAX_DRIFTING_SPEED_KNOTS = 20;
309 private static final long DEFAULT_EXPIRY_AGE_MS = TimeUnit.HOURS.toMillis(6);
310 private static final long DEFAULT_NON_DRIFTING_THRESHOLD_MS = TimeUnit.MINUTES.toMillis(5);
311
312 private final int minHeadingCogDifference;
313 private final int maxHeadingCogDifference;
314 private final float minDriftingSpeedKnots;
315 private final float maxDriftingSpeedKnots;
316 private final long expiryAgeMs;
317 private final long nonDriftingThresholdMs;
318
319 private static class Holder {
320
321 static Options INSTANCE = new Options(DEFAULT_HEADING_COG_DIFFERENCE_MIN,
322 DEFAULT_HEADING_COG_DIFFERENCE_MAX, DEFAULT_MIN_DRIFTING_SPEED_KNOTS,
323 DEFAULT_MAX_DRIFTING_SPEED_KNOTS, DEFAULT_EXPIRY_AGE_MS,
324 DEFAULT_NON_DRIFTING_THRESHOLD_MS);
325 }
326
327 public static Options instance() {
328 return Holder.INSTANCE;
329 }
330
331 public Options(int minHeadingCogDifference, int maxHeadingCogDifference,
332 float minDriftingSpeedKnots, float maxDriftingSpeedKnots, long expiryAgeMs,
333 long nonDriftingThresholdMs) {
334 Preconditions.checkArgument(minHeadingCogDifference >= 0);
335 Preconditions.checkArgument(minDriftingSpeedKnots >= 0);
336 Preconditions.checkArgument(minHeadingCogDifference <= maxHeadingCogDifference);
337 Preconditions.checkArgument(minDriftingSpeedKnots <= maxDriftingSpeedKnots);
338 Preconditions.checkArgument(expiryAgeMs > 0);
339 Preconditions.checkArgument(nonDriftingThresholdMs >= 0);
340 this.minHeadingCogDifference = minHeadingCogDifference;
341 this.maxHeadingCogDifference = maxHeadingCogDifference;
342 this.minDriftingSpeedKnots = minDriftingSpeedKnots;
343 this.maxDriftingSpeedKnots = maxDriftingSpeedKnots;
344 this.expiryAgeMs = expiryAgeMs;
345 this.nonDriftingThresholdMs = nonDriftingThresholdMs;
346 }
347
348 public int maxHeadingCogDifference() {
349 return maxHeadingCogDifference;
350 }
351
352 public int minHeadingCogDifference() {
353 return minHeadingCogDifference;
354 }
355
356 public float maxDriftingSpeedKnots() {
357 return maxDriftingSpeedKnots;
358 }
359
360 public float minDriftingSpeedKnots() {
361 return minDriftingSpeedKnots;
362 }
363
364 public long expiryAgeMs() {
365 return expiryAgeMs;
366 }
367
368 public long nonDriftingThresholdMs() {
369 return nonDriftingThresholdMs;
370 }
371
372 @Override
373 public String toString() {
374 StringBuilder b = new StringBuilder();
375 b.append("Options [minHeadingCogDifference=");
376 b.append(minHeadingCogDifference);
377 b.append(", maxHeadingCogDifference=");
378 b.append(maxHeadingCogDifference);
379 b.append(", minDriftingSpeedKnots=");
380 b.append(minDriftingSpeedKnots);
381 b.append(", maxDriftingSpeedKnots=");
382 b.append(maxDriftingSpeedKnots);
383 b.append(", expiryAgeMs=");
384 b.append(expiryAgeMs);
385 b.append(", nonDriftingThresholdMs=");
386 b.append(nonDriftingThresholdMs);
387 b.append("]");
388 return b.toString();
389 }
390
391 }
392
393 }