1 package au.gov.amsa.geo;
2
3 import java.io.BufferedOutputStream;
4 import java.io.BufferedWriter;
5 import java.io.Closeable;
6 import java.io.File;
7 import java.io.FileNotFoundException;
8 import java.io.FileOutputStream;
9 import java.io.IOException;
10 import java.io.InputStreamReader;
11 import java.io.OutputStream;
12 import java.io.OutputStreamWriter;
13 import java.io.PrintStream;
14 import java.io.Reader;
15 import java.text.DecimalFormat;
16 import java.time.Instant;
17 import java.time.ZoneOffset;
18 import java.time.ZonedDateTime;
19 import java.time.format.DateTimeFormatter;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Optional;
25 import java.util.TreeMap;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.concurrent.atomic.AtomicLong;
28 import java.util.regex.Pattern;
29
30 import org.apache.commons.io.FileUtils;
31
32 import com.github.davidmoten.grumpy.core.Position;
33 import com.github.davidmoten.guavamini.annotations.VisibleForTesting;
34 import com.github.davidmoten.rx.Checked;
35 import com.google.common.base.Preconditions;
36
37 import au.gov.amsa.geo.distance.EffectiveSpeedCheck;
38 import au.gov.amsa.geo.distance.OperatorEffectiveSpeedChecker;
39 import au.gov.amsa.geo.model.SegmentOptions;
40 import au.gov.amsa.gt.Shapefile;
41 import au.gov.amsa.risky.format.BinaryFixes;
42 import au.gov.amsa.risky.format.BinaryFixesFormat;
43 import au.gov.amsa.risky.format.Fix;
44 import au.gov.amsa.streams.Strings;
45 import au.gov.amsa.util.Files;
46 import rx.Observable;
47
48 public final class VoyageDatasetProducer {
49
50 private static final String COMMA = ",";
51 private static final DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm'Z'");
52
53 public static void produce(File output, File fixesOutput, List<File> list) throws Exception {
54
55 output.delete();
56 FileUtils.deleteDirectory(fixesOutput);
57
58 int numFiles = list.size();
59 System.out.println(numFiles + "binary fix files");
60
61 AtomicInteger fileNumber = new AtomicInteger(0);
62 Collection<Port> ports = loadPorts();
63 Collection<EezWaypoint> eezWaypoints = readEezWaypoints();
64 Shapefile eezLine = Eez.loadEezLine();
65 Shapefile eezPolygon = Eez.loadEezPolygon();
66 System.out.println("loaded eez shapefiles");
67 long t = System.currentTimeMillis();
68 AtomicLong failedCheck = new AtomicLong();
69 AtomicLong fixCount = new AtomicLong();
70 Map<Integer, Integer> mmsisWithFailedChecks = new TreeMap<>();
71 Persister persister = new Persister(fixesOutput);
72
73 try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(output)))) {
74
75
76
77 Observable.from(list)
78 .groupBy(f -> mmsiFromFilename(f))
79 .flatMap(files -> {
80 String mmsi = files.getKey();
81 if (!isShipMmsi(mmsi)) {
82 return Observable.empty();
83 } else {
84 return files
85 .compose(o -> logPercentCompleted(numFiles, t, o, fileNumber))
86 .concatMap(BinaryFixes::from)
87 .lift(new OperatorEffectiveSpeedChecker(
88 SegmentOptions.builder().acceptAnyFixHours(24L).maxSpeedKnots(50).build()))
89 .doOnNext(
90 check -> updatedCounts(failedCheck, fixCount, mmsisWithFailedChecks, check))
91 .filter(check -> check.isOk())
92 .map(check -> check.fix())
93 .doOnNext(fix -> persister.persist(fix))
94 .compose(o -> toLegs(eezLine, eezPolygon, ports, eezWaypoints, o))
95 .filter(x -> includeLeg(x));
96 }
97 }
98
99 )
100 .sorted((a, b) -> compareByMmsiThenLegStartTime(a, b))
101 .doOnNext(x -> write(writer, x))
102 .doOnTerminate(Checked.a0(() -> persister.close()))
103 .toBlocking()
104 .subscribe();
105 System.out.println((System.currentTimeMillis() - t) + "ms");
106 System.out.println("total fixes=" + fixCount.get());
107 System.out.println("num fixes rejected due failed effective speed check=" + failedCheck.get());
108 System.out.println("num mmsis with failed effective speed checks=" + mmsisWithFailedChecks.size());
109
110 try (PrintStream p = new PrintStream("target/info.txt")) {
111 p.println("total fixes=" + fixCount.get());
112 p.println("num fixes rejected due failed effective speed check=" + failedCheck.get());
113 p.println("num mmsis with failed effective speed checks=" + mmsisWithFailedChecks.size());
114 }
115 try (PrintStream p = new PrintStream("target/failures.txt")) {
116 p.println("failures mmsi <TAB> number of rejected fixes");
117 for (Integer mmsi : mmsisWithFailedChecks.keySet()) {
118 p.println(mmsi + "\t" + mmsisWithFailedChecks.get(mmsi));
119 }
120 }
121 }
122 }
123
124
125 final static class Persister implements Closeable {
126
127 private final File outputDirectory;
128 private OutputStream currentPersistOutputStream;
129 private int currentPersistMmsi = -1;
130
131 public Persister(File outputDirectory) {
132 this.outputDirectory = outputDirectory;
133 outputDirectory.mkdirs();
134 }
135
136
137
138
139
140
141
142 void persist(Fix fix) {
143
144
145 if (fix.mmsi() != currentPersistMmsi) {
146 currentPersistMmsi = fix.mmsi();
147 if (currentPersistOutputStream != null) {
148 try {
149 currentPersistOutputStream.close();
150 } catch (IOException e) {
151 throw new RuntimeException(e);
152 }
153 }
154 try {
155 currentPersistOutputStream = new BufferedOutputStream(
156 new FileOutputStream(new File(outputDirectory, fix.mmsi() + ".track"), true));
157 } catch (FileNotFoundException e) {
158 throw new RuntimeException(e);
159 }
160
161 }
162 BinaryFixes.write(fix, currentPersistOutputStream, BinaryFixesFormat.WITHOUT_MMSI);
163 }
164
165 @Override
166 public void close() throws IOException {
167 if (currentPersistOutputStream != null) {
168 currentPersistOutputStream.close();
169 }
170
171 }
172 }
173
174 private static String mmsiFromFilename(File f) {
175 return f.getName().substring(0, f.getName().indexOf("."));
176 }
177
178 private static boolean isShipMmsi(String m) {
179
180
181
182
183
184 return (m.length() == 9 && !m.startsWith("0") && !m.startsWith("1") && !m.startsWith("8")
185 && !m.startsWith("9"));
186 }
187
188 private static int compareByMmsiThenLegStartTime(TimedLeg x, TimedLeg y) {
189 if (x.mmsi == y.mmsi) {
190
191 return Long.compare(x.a.time, y.a.time);
192 } else {
193 return Integer.compare(x.mmsi, y.mmsi);
194 }
195 }
196
197 private static void updatedCounts(AtomicLong failedCheck, AtomicLong fixCount,
198 Map<Integer, Integer> mmsisWithFailedChecks, EffectiveSpeedCheck check) {
199 fixCount.incrementAndGet();
200 if (!check.isOk()) {
201 int count = mmsisWithFailedChecks.getOrDefault(check.fix().mmsi(), 0);
202 mmsisWithFailedChecks.put(check.fix().mmsi(), count + 1);
203 failedCheck.incrementAndGet();
204 }
205 }
206
207 private static void write(BufferedWriter writer, TimedLeg x) {
208 try {
209 writer.write(String.valueOf(x.mmsi));
210 writer.write(COMMA);
211 writer.write(formatTime(x.a.time));
212 writer.write(COMMA);
213 writer.write(x.a.waypoint.code());
214 writer.write(COMMA);
215 writer.write(formatTime(x.b.time));
216 writer.write(COMMA);
217 writer.write(x.b.waypoint.code());
218 writer.write("\n");
219 } catch (Exception e) {
220 throw new RuntimeException(e);
221 }
222 }
223
224 private static boolean includeLeg(TimedLeg x) {
225
226 return !(x.a.waypoint instanceof EezWaypoint && x.b.waypoint instanceof EezWaypoint);
227 }
228
229 private static String formatTime(long t) {
230 return format.format(ZonedDateTime.ofInstant(Instant.ofEpochMilli(t), ZoneOffset.UTC));
231 }
232
233 private static Collection<EezWaypoint> readEezWaypoints() throws IOException {
234 Collection<EezWaypoint> eezWaypoints;
235 try (Reader reader = new InputStreamReader(
236 VoyageDatasetProducer.class.getResourceAsStream("/eez-waypoints.csv"))) {
237 eezWaypoints = Strings.lines(reader)
238 .map(line -> line.trim())
239 .filter(line -> line.length() > 0)
240 .filter(line -> !line.startsWith("#"))
241 .map(line -> line.split(COMMA))
242 .map(items -> new EezWaypoint(items[0], Double.parseDouble(items[2]), Double.parseDouble(items[1]),
243
244 Optional.empty()))
245 .doOnNext(System.out::println)
246 .toList()
247 .toBlocking().single();
248 }
249 return eezWaypoints;
250 }
251
252 static Collection<Port> loadPorts() throws IOException {
253 Collection<Port> ports;
254 try (Reader reader = new InputStreamReader(VoyageDatasetProducer.class.getResourceAsStream("/ports.txt"))) {
255 ports = Strings.lines(reader)
256 .map(line -> line.trim())
257 .filter(line -> line.length() > 0)
258 .filter(line -> !line.startsWith("#"))
259 .map(line -> line.split("\t"))
260 .map(items -> new Port(items[0], items[1],
261 Shapefile.fromZip(VoyageDatasetProducer.class
262 .getResourceAsStream("/port-visit-shapefiles/" + items[2]))))
263 .doOnNext(x -> System.out.println(x.name + " - " + x.visitRegion.contains(-33.8568, 151.2153)))
264 .toList()
265 .toBlocking().single();
266 }
267 return ports;
268 }
269
270 private static Observable<File> logPercentCompleted(int numFiles, long startTime, Observable<File> o,
271 AtomicInteger fileNumber) {
272 return o.doOnNext(file -> {
273 int n = fileNumber.incrementAndGet();
274 if (n % 1000 == 0) {
275 long t = System.currentTimeMillis();
276 long timeRemainingSeconds = Math.round(((double) t - startTime) / n * (numFiles - n)) / 1000;
277 System.out.println("complete: " + new DecimalFormat("0.0").format(n / (double) numFiles * 100)
278 + "%, seconds remaining " + timeRemainingSeconds);
279 }
280 });
281 }
282
283 private enum EezStatus {
284 IN, OUT, UNKNOWN;
285
286 public static EezStatus from(boolean inEez) {
287 return inEez ? IN : OUT;
288 }
289 }
290
291 public static final class TimedLeg {
292
293 private static final DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
294 public final int mmsi;
295 public final TimedWaypoint a;
296 public final TimedWaypoint b;
297
298 public TimedLeg(int mmsi, TimedWaypoint a, TimedWaypoint b) {
299 Preconditions.checkNotNull(a);
300 Preconditions.checkNotNull(b);
301 this.mmsi = mmsi;
302 this.a = a;
303 this.b = b;
304 }
305
306 @Override
307 public String toString() {
308 return format.format(ZonedDateTime.ofInstant(Instant.ofEpochMilli(a.time), ZoneOffset.UTC)) + "->"
309 + format.format(ZonedDateTime.ofInstant(Instant.ofEpochMilli(b.time), ZoneOffset.UTC)) + " "
310 + a.waypoint.name() + "->" + b.waypoint.name();
311 }
312 }
313
314 private static final class State {
315 final TimedWaypoint timedWaypoint;
316 final Fix latestFix;
317 final EezStatus fixStatus;
318
319 State(TimedWaypoint waypoint, Fix latestFix, EezStatus fixStatus) {
320 this.timedWaypoint = waypoint;
321 this.latestFix = latestFix;
322 this.fixStatus = fixStatus;
323 }
324
325 }
326
327 @VisibleForTesting
328 static Observable<TimedLeg> toLegs(Shapefile eezLine, Shapefile eezPolygon, Collection<Port> ports,
329 Collection<EezWaypoint> eezWaypoints, Observable<Fix> fixes) {
330 return Observable.defer(() ->
331 {
332 State[] state = new State[1];
333 state[0] = new State(null, null, EezStatus.UNKNOWN);
334 return fixes
335 .flatMap(fix -> {
336 List<TimedLeg> legs = null;
337
338 boolean inEez = eezPolygon.contains(fix.lat(), fix.lon());
339 State current = state[0];
340 Preconditions.checkArgument(current.latestFix == null || fix.time() >= current.latestFix.time(),
341 "fixes out of time order!");
342 boolean crossed = (inEez && current.fixStatus == EezStatus.OUT)
343 || (!inEez && current.fixStatus == EezStatus.IN);
344 if (crossed) {
345 TimedWaypoint closestWaypoint = findClosestWaypoint(eezLine, eezWaypoints, fix, current);
346 if (current.timedWaypoint != null) {
347 if (legs == null) {
348 legs = new ArrayList<>(2);
349 }
350 legs.add(new TimedLeg(fix.mmsi(), current.timedWaypoint, closestWaypoint));
351 }
352 current = new State(closestWaypoint, fix, EezStatus.from(inEez));
353 }
354
355
356
357 if (inEez) {
358 Optional<Port> port = findPort(ports, fix.lat(), fix.lon());
359 if (port.isPresent()) {
360 TimedWaypoint portTimedWaypoint = new TimedWaypoint(port.get(), fix.time());
361 state[0] = new State(portTimedWaypoint, fix, EezStatus.IN);
362 if (current.fixStatus != EezStatus.UNKNOWN && current.timedWaypoint != null
363 && current.timedWaypoint.waypoint != port.get()) {
364 if (current.timedWaypoint != null) {
365 if (legs == null) {
366 legs = new ArrayList<>(2);
367 }
368 legs.add(new TimedLeg(fix.mmsi(), current.timedWaypoint, portTimedWaypoint));
369 }
370 }
371 } else {
372 state[0] = new State(current.timedWaypoint, fix, EezStatus.IN);
373 }
374 } else {
375 state[0] = new State(current.timedWaypoint, fix, EezStatus.OUT);
376 }
377 if (legs == null) {
378 return Observable.empty();
379 } else {
380 return Observable.from(legs);
381 }
382
383 });
384 });
385
386 }
387
388 private static TimedWaypoint findClosestWaypoint(Shapefile eezLine, Collection<EezWaypoint> eezWaypoints, Fix fix,
389 State previous) {
390 TimedPosition crossingPoint = ShapefileUtil.findRegionCrossingPoint(eezLine, previous.latestFix, fix);
391 EezWaypoint closest = null;
392 double closestDistanceKm = 0;
393 for (EezWaypoint w : eezWaypoints) {
394 double d = distanceKm(crossingPoint.lat, crossingPoint.lon, w.lat, w.lon);
395 if (closest == null || (d < closestDistanceKm && d <= w.thresholdKm.orElse(Double.MAX_VALUE))) {
396 closest = w;
397 closestDistanceKm = d;
398 }
399 }
400 Preconditions.checkNotNull(closest, "no eez waypoint found!");
401 return new TimedWaypoint(closest, crossingPoint.time);
402 }
403
404 private static Optional<Port> findPort(Collection<Port> ports, float lat, float lon) {
405 for (Port port : ports) {
406 if (port.visitRegion.contains(lat, lon)) {
407 return Optional.of(port);
408 }
409 }
410 return Optional.empty();
411 }
412
413 private static double distanceKm(double lat, double lon, double lat2, double lon2) {
414 return Position.create(lat, lon).getDistanceToKm(Position.create(lat2, lon2));
415 }
416
417 public static interface Waypoint {
418 String name();
419
420 String code();
421 }
422
423 @VisibleForTesting
424 public static final class EezWaypoint implements Waypoint {
425 final String name;
426 final double lat;
427 final double lon;
428 final Optional<Double> thresholdKm;
429
430 EezWaypoint(String name, double lat, double lon, Optional<Double> thresholdKm) {
431 this.name = name;
432 this.lat = lat;
433 this.lon = lon;
434 this.thresholdKm = thresholdKm;
435 }
436
437 @Override
438 public String name() {
439 return name;
440 }
441
442 @Override
443 public String toString() {
444 return "EezWaypoint [name=" + name + ", lat=" + lat + ", lon=" + lon + ", thresholdKm=" + thresholdKm + "]";
445 }
446
447 @Override
448 public String code() {
449 return name();
450 }
451
452 }
453
454 @VisibleForTesting
455 public static final class Port implements Waypoint {
456 public final String name;
457 public final String code;
458 public final Shapefile visitRegion;
459
460 Port(String name, String code, Shapefile visitRegion) {
461 this.name = name;
462 this.code = code;
463 this.visitRegion = visitRegion;
464 }
465
466 @Override
467 public String name() {
468 return name;
469 }
470
471 @Override
472 public String toString() {
473 return "Port [name=" + name + "]";
474 }
475
476 @Override
477 public String code() {
478 return code;
479 }
480
481 }
482
483 @VisibleForTesting
484 public static final class TimedWaypoint {
485 public final Waypoint waypoint;
486 public final long time;
487
488 TimedWaypoint(Waypoint waypoint, long time) {
489 this.waypoint = waypoint;
490 this.time = time;
491 }
492
493 @Override
494 public String toString() {
495 return "TimedWaypoint [waypoint=" + waypoint + ", time=" + time + "]";
496 }
497
498 }
499
500 public static void main(String[] args) throws Exception {
501 File output = new File("target/legs.txt");
502 File fixesOutput = new File("/media/an/temp/fixes");
503 List<File> list = new ArrayList<File>();
504
505 String baseFilename = "/media/an/binary-fixes-5-minute/";
506 Pattern pattern = Pattern.compile(".*\\.track");
507 list.addAll(Files.find(new File(baseFilename + "2014"), pattern));
508 list.addAll(Files.find(new File(baseFilename + "2015"), pattern));
509 list.addAll(Files.find(new File(baseFilename + "2016"), pattern));
510
511 VoyageDatasetProducer.produce(output, fixesOutput, list);
512 }
513 }