View Javadoc
1   package au.gov.amsa.geo;
2   
3   import java.io.BufferedOutputStream;
4   import java.io.BufferedWriter;
5   import java.io.Closeable;
6   import java.io.File;
7   import java.io.FileNotFoundException;
8   import java.io.FileOutputStream;
9   import java.io.IOException;
10  import java.io.InputStreamReader;
11  import java.io.OutputStream;
12  import java.io.OutputStreamWriter;
13  import java.io.PrintStream;
14  import java.io.Reader;
15  import java.text.DecimalFormat;
16  import java.time.Instant;
17  import java.time.ZoneOffset;
18  import java.time.ZonedDateTime;
19  import java.time.format.DateTimeFormatter;
20  import java.util.ArrayList;
21  import java.util.Collection;
22  import java.util.List;
23  import java.util.Map;
24  import java.util.Optional;
25  import java.util.TreeMap;
26  import java.util.concurrent.atomic.AtomicInteger;
27  import java.util.concurrent.atomic.AtomicLong;
28  import java.util.regex.Pattern;
29  
30  import org.apache.commons.io.FileUtils;
31  
32  import com.github.davidmoten.grumpy.core.Position;
33  import com.github.davidmoten.guavamini.annotations.VisibleForTesting;
34  import com.github.davidmoten.rx.Checked;
35  import com.google.common.base.Preconditions;
36  
37  import au.gov.amsa.geo.distance.EffectiveSpeedCheck;
38  import au.gov.amsa.geo.distance.OperatorEffectiveSpeedChecker;
39  import au.gov.amsa.geo.model.SegmentOptions;
40  import au.gov.amsa.gt.Shapefile;
41  import au.gov.amsa.risky.format.BinaryFixes;
42  import au.gov.amsa.risky.format.BinaryFixesFormat;
43  import au.gov.amsa.risky.format.Fix;
44  import au.gov.amsa.streams.Strings;
45  import au.gov.amsa.util.Files;
46  import rx.Observable;
47  
48  public final class VoyageDatasetProducer {
49  
50      private static final String COMMA = ",";
51      private static final DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm'Z'");
52  
53      public static void produce(File output, File fixesOutput, List<File> list) throws Exception {
54          // reset output directories
55          output.delete();
56          FileUtils.deleteDirectory(fixesOutput);
57  
58          int numFiles = list.size();
59          System.out.println(numFiles + "binary fix files");
60  
61          AtomicInteger fileNumber = new AtomicInteger(0);
62          Collection<Port> ports = loadPorts();
63          Collection<EezWaypoint> eezWaypoints = readEezWaypoints();
64          Shapefile eezLine = Eez.loadEezLine();
65          Shapefile eezPolygon = Eez.loadEezPolygon();
66          System.out.println("loaded eez shapefiles");
67          long t = System.currentTimeMillis();
68          AtomicLong failedCheck = new AtomicLong();
69          AtomicLong fixCount = new AtomicLong();
70          Map<Integer, Integer> mmsisWithFailedChecks = new TreeMap<>();
71          Persister persister = new Persister(fixesOutput);
72  
73          try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(output)))) {
74  
75              // Note that in the observable below we don't employ parallel techniques
76              // this is because the runtime is acceptable
77              Observable.from(list) //
78                      .groupBy(f -> mmsiFromFilename(f)) //
79                      .flatMap(files -> {
80                          String mmsi = files.getKey();
81                          if (!isShipMmsi(mmsi)) {
82                              return Observable.empty();
83                          } else {
84                              return files //
85                                      .compose(o -> logPercentCompleted(numFiles, t, o, fileNumber)) //
86                                      .concatMap(BinaryFixes::from) //
87                                      .lift(new OperatorEffectiveSpeedChecker(
88                                              SegmentOptions.builder().acceptAnyFixHours(24L).maxSpeedKnots(50).build()))
89                                      .doOnNext(
90                                              check -> updatedCounts(failedCheck, fixCount, mmsisWithFailedChecks, check)) //
91                                      .filter(check -> check.isOk()) //
92                                      .map(check -> check.fix()) //
93                                      .doOnNext(fix -> persister.persist(fix))
94                                      .compose(o -> toLegs(eezLine, eezPolygon, ports, eezWaypoints, o)) //
95                                      .filter(x -> includeLeg(x));
96                          }
97                      } //
98  
99                      ) //
100                     .sorted((a, b) -> compareByMmsiThenLegStartTime(a, b)) //
101                     .doOnNext(x -> write(writer, x)) //
102                     .doOnTerminate(Checked.a0(() -> persister.close())) //
103                     .toBlocking() //
104                     .subscribe();
105             System.out.println((System.currentTimeMillis() - t) + "ms");
106             System.out.println("total fixes=" + fixCount.get());
107             System.out.println("num fixes rejected due failed effective speed check=" + failedCheck.get());
108             System.out.println("num mmsis with failed effective speed checks=" + mmsisWithFailedChecks.size());
109 
110             try (PrintStream p = new PrintStream("target/info.txt")) {
111                 p.println("total fixes=" + fixCount.get());
112                 p.println("num fixes rejected due failed effective speed check=" + failedCheck.get());
113                 p.println("num mmsis with failed effective speed checks=" + mmsisWithFailedChecks.size());
114             }
115             try (PrintStream p = new PrintStream("target/failures.txt")) {
116                 p.println("failures mmsi <TAB> number of rejected fixes");
117                 for (Integer mmsi : mmsisWithFailedChecks.keySet()) {
118                     p.println(mmsi + "\t" + mmsisWithFailedChecks.get(mmsi));
119                 }
120             }
121         }
122     }
123 
124     // not thread-safe!!!!
125     final static class Persister implements Closeable {
126 
127         private final File outputDirectory;
128         private OutputStream currentPersistOutputStream;
129         private int currentPersistMmsi = -1;
130 
131         public Persister(File outputDirectory) {
132             this.outputDirectory = outputDirectory;
133             outputDirectory.mkdirs();
134         }
135 
136         /**
137          * Writes fix to binary track file in {@code directory}.
138          * 
139          * @param fix
140          *            fix to persist to file
141          */
142         void persist(Fix fix) {
143             // Note that this logic only works if this method called serially
144             // don't call this in parallel processing!
145             if (fix.mmsi() != currentPersistMmsi) {
146                 currentPersistMmsi = fix.mmsi();
147                 if (currentPersistOutputStream != null) {
148                     try {
149                         currentPersistOutputStream.close();
150                     } catch (IOException e) {
151                         throw new RuntimeException(e);
152                     }
153                 }
154                 try {
155                     currentPersistOutputStream = new BufferedOutputStream(
156                             new FileOutputStream(new File(outputDirectory, fix.mmsi() + ".track"), true));
157                 } catch (FileNotFoundException e) {
158                     throw new RuntimeException(e);
159                 }
160 
161             }
162             BinaryFixes.write(fix, currentPersistOutputStream, BinaryFixesFormat.WITHOUT_MMSI);
163         }
164 
165         @Override
166         public void close() throws IOException {
167             if (currentPersistOutputStream != null) {
168                 currentPersistOutputStream.close();
169             }
170 
171         }
172     }
173 
174     private static String mmsiFromFilename(File f) {
175         return f.getName().substring(0, f.getName().indexOf("."));
176     }
177 
178     private static boolean isShipMmsi(String m) {
179         // first digit
180         // 0 - station, ship group
181         // 1 - SAR aircraft
182         // 8 - handheld VHF transceiver
183         // 9 - custom devices
184         return (m.length() == 9 && !m.startsWith("0") && !m.startsWith("1") && !m.startsWith("8")
185                 && !m.startsWith("9"));
186     }
187 
188     private static int compareByMmsiThenLegStartTime(TimedLeg x, TimedLeg y) {
189         if (x.mmsi == y.mmsi) {
190             // compare using just the start time of the leg
191             return Long.compare(x.a.time, y.a.time);
192         } else {
193             return Integer.compare(x.mmsi, y.mmsi);
194         }
195     }
196 
197     private static void updatedCounts(AtomicLong failedCheck, AtomicLong fixCount,
198             Map<Integer, Integer> mmsisWithFailedChecks, EffectiveSpeedCheck check) {
199         fixCount.incrementAndGet();
200         if (!check.isOk()) {
201             int count = mmsisWithFailedChecks.getOrDefault(check.fix().mmsi(), 0);
202             mmsisWithFailedChecks.put(check.fix().mmsi(), count + 1);
203             failedCheck.incrementAndGet();
204         }
205     }
206 
207     private static void write(BufferedWriter writer, TimedLeg x) {
208         try {
209             writer.write(String.valueOf(x.mmsi));
210             writer.write(COMMA);
211             writer.write(formatTime(x.a.time));
212             writer.write(COMMA);
213             writer.write(x.a.waypoint.code());
214             writer.write(COMMA);
215             writer.write(formatTime(x.b.time));
216             writer.write(COMMA);
217             writer.write(x.b.waypoint.code());
218             writer.write("\n");
219         } catch (Exception e) {
220             throw new RuntimeException(e);
221         }
222     }
223 
224     private static boolean includeLeg(TimedLeg x) {
225         // exclude EEZ -> EEZ
226         return !(x.a.waypoint instanceof EezWaypoint && x.b.waypoint instanceof EezWaypoint);
227     }
228 
229     private static String formatTime(long t) {
230         return format.format(ZonedDateTime.ofInstant(Instant.ofEpochMilli(t), ZoneOffset.UTC));
231     }
232 
233     private static Collection<EezWaypoint> readEezWaypoints() throws IOException {
234         Collection<EezWaypoint> eezWaypoints;
235         try (Reader reader = new InputStreamReader(
236                 VoyageDatasetProducer.class.getResourceAsStream("/eez-waypoints.csv"))) {
237             eezWaypoints = Strings.lines(reader) //
238                     .map(line -> line.trim()) //
239                     .filter(line -> line.length() > 0) //
240                     .filter(line -> !line.startsWith("#")) //
241                     .map(line -> line.split(COMMA))
242                     .map(items -> new EezWaypoint(items[0], Double.parseDouble(items[2]), Double.parseDouble(items[1]),
243                             // TODO read thresholdKm
244                             Optional.empty())) //
245                     .doOnNext(System.out::println) //
246                     .toList() //
247                     .toBlocking().single();
248         }
249         return eezWaypoints;
250     }
251 
252     static Collection<Port> loadPorts() throws IOException {
253         Collection<Port> ports;
254         try (Reader reader = new InputStreamReader(VoyageDatasetProducer.class.getResourceAsStream("/ports.txt"))) {
255             ports = Strings.lines(reader) //
256                     .map(line -> line.trim()) //
257                     .filter(line -> line.length() > 0) //
258                     .filter(line -> !line.startsWith("#")) //
259                     .map(line -> line.split("\t"))
260                     .map(items -> new Port(items[0], items[1],
261                             Shapefile.fromZip(VoyageDatasetProducer.class
262                                     .getResourceAsStream("/port-visit-shapefiles/" + items[2])))) //
263                     .doOnNext(x -> System.out.println(x.name + " - " + x.visitRegion.contains(-33.8568, 151.2153))) //
264                     .toList() //
265                     .toBlocking().single();
266         }
267         return ports;
268     }
269 
270     private static Observable<File> logPercentCompleted(int numFiles, long startTime, Observable<File> o,
271             AtomicInteger fileNumber) {
272         return o.doOnNext(file -> {
273             int n = fileNumber.incrementAndGet();
274             if (n % 1000 == 0) {
275                 long t = System.currentTimeMillis();
276                 long timeRemainingSeconds = Math.round(((double) t - startTime) / n * (numFiles - n)) / 1000;
277                 System.out.println("complete: " + new DecimalFormat("0.0").format(n / (double) numFiles * 100)
278                         + "%, seconds remaining " + timeRemainingSeconds);
279             }
280         });
281     }
282 
283     private enum EezStatus {
284         IN, OUT, UNKNOWN;
285 
286         public static EezStatus from(boolean inEez) {
287             return inEez ? IN : OUT;
288         }
289     }
290 
291     public static final class TimedLeg {
292 
293         private static final DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
294         public final int mmsi;
295         public final TimedWaypoint a;
296         public final TimedWaypoint b;
297 
298         public TimedLeg(int mmsi, TimedWaypoint a, TimedWaypoint b) {
299             Preconditions.checkNotNull(a);
300             Preconditions.checkNotNull(b);
301             this.mmsi = mmsi;
302             this.a = a;
303             this.b = b;
304         }
305 
306         @Override
307         public String toString() {
308             return format.format(ZonedDateTime.ofInstant(Instant.ofEpochMilli(a.time), ZoneOffset.UTC)) + "->"
309                     + format.format(ZonedDateTime.ofInstant(Instant.ofEpochMilli(b.time), ZoneOffset.UTC)) + " "
310                     + a.waypoint.name() + "->" + b.waypoint.name();
311         }
312     }
313 
314     private static final class State {
315         final TimedWaypoint timedWaypoint; // nullable
316         final Fix latestFix; // nullable
317         final EezStatus fixStatus; // defaults to UNKNOWN
318 
319         State(TimedWaypoint waypoint, Fix latestFix, EezStatus fixStatus) {
320             this.timedWaypoint = waypoint;
321             this.latestFix = latestFix;
322             this.fixStatus = fixStatus;
323         }
324 
325     }
326 
327     @VisibleForTesting
328     static Observable<TimedLeg> toLegs(Shapefile eezLine, Shapefile eezPolygon, Collection<Port> ports,
329             Collection<EezWaypoint> eezWaypoints, Observable<Fix> fixes) {
330         return Observable.defer(() -> //
331         {
332             State[] state = new State[1];
333             state[0] = new State(null, null, EezStatus.UNKNOWN);
334             return fixes //
335                     .flatMap(fix -> {
336                         List<TimedLeg> legs = null; // only create when needed to reduce
337                                                     // allocations
338                         boolean inEez = eezPolygon.contains(fix.lat(), fix.lon());
339                         State current = state[0];
340                         Preconditions.checkArgument(current.latestFix == null || fix.time() >= current.latestFix.time(),
341                                 "fixes out of time order!");
342                         boolean crossed = (inEez && current.fixStatus == EezStatus.OUT)
343                                 || (!inEez && current.fixStatus == EezStatus.IN);
344                         if (crossed) {
345                             TimedWaypoint closestWaypoint = findClosestWaypoint(eezLine, eezWaypoints, fix, current);
346                             if (current.timedWaypoint != null) {
347                                 if (legs == null) {
348                                     legs = new ArrayList<>(2);
349                                 }
350                                 legs.add(new TimedLeg(fix.mmsi(), current.timedWaypoint, closestWaypoint));
351                             }
352                             current = new State(closestWaypoint, fix, EezStatus.from(inEez));
353                         }
354                         // Note that may have detected eez crossing but also
355                         // have arrived in port so may need to return both
356                         // waypoints
357                         if (inEez) {
358                             Optional<Port> port = findPort(ports, fix.lat(), fix.lon());
359                             if (port.isPresent()) {
360                                 TimedWaypoint portTimedWaypoint = new TimedWaypoint(port.get(), fix.time());
361                                 state[0] = new State(portTimedWaypoint, fix, EezStatus.IN);
362                                 if (current.fixStatus != EezStatus.UNKNOWN && current.timedWaypoint != null
363                                         && current.timedWaypoint.waypoint != port.get()) {
364                                     if (current.timedWaypoint != null) {
365                                         if (legs == null) {
366                                             legs = new ArrayList<>(2);
367                                         }
368                                         legs.add(new TimedLeg(fix.mmsi(), current.timedWaypoint, portTimedWaypoint));
369                                     }
370                                 }
371                             } else {
372                                 state[0] = new State(current.timedWaypoint, fix, EezStatus.IN);
373                             }
374                         } else {
375                             state[0] = new State(current.timedWaypoint, fix, EezStatus.OUT);
376                         }
377                         if (legs == null) {
378                             return Observable.empty();
379                         } else {
380                             return Observable.from(legs);
381                         }
382 
383                     });
384         });
385 
386     }
387 
388     private static TimedWaypoint findClosestWaypoint(Shapefile eezLine, Collection<EezWaypoint> eezWaypoints, Fix fix,
389             State previous) {
390         TimedPosition crossingPoint = ShapefileUtil.findRegionCrossingPoint(eezLine, previous.latestFix, fix);
391         EezWaypoint closest = null;
392         double closestDistanceKm = 0;
393         for (EezWaypoint w : eezWaypoints) {
394             double d = distanceKm(crossingPoint.lat, crossingPoint.lon, w.lat, w.lon);
395             if (closest == null || (d < closestDistanceKm && d <= w.thresholdKm.orElse(Double.MAX_VALUE))) {
396                 closest = w;
397                 closestDistanceKm = d;
398             }
399         }
400         Preconditions.checkNotNull(closest, "no eez waypoint found!");
401         return new TimedWaypoint(closest, crossingPoint.time);
402     }
403 
404     private static Optional<Port> findPort(Collection<Port> ports, float lat, float lon) {
405         for (Port port : ports) {
406             if (port.visitRegion.contains(lat, lon)) {
407                 return Optional.of(port);
408             }
409         }
410         return Optional.empty();
411     }
412 
413     private static double distanceKm(double lat, double lon, double lat2, double lon2) {
414         return Position.create(lat, lon).getDistanceToKm(Position.create(lat2, lon2));
415     }
416 
417     public static interface Waypoint {
418         String name();
419 
420         String code();
421     }
422 
423     @VisibleForTesting
424     public static final class EezWaypoint implements Waypoint {
425         final String name;
426         final double lat;
427         final double lon;
428         final Optional<Double> thresholdKm;
429 
430         EezWaypoint(String name, double lat, double lon, Optional<Double> thresholdKm) {
431             this.name = name;
432             this.lat = lat;
433             this.lon = lon;
434             this.thresholdKm = thresholdKm;
435         }
436 
437         @Override
438         public String name() {
439             return name;
440         }
441 
442         @Override
443         public String toString() {
444             return "EezWaypoint [name=" + name + ", lat=" + lat + ", lon=" + lon + ", thresholdKm=" + thresholdKm + "]";
445         }
446 
447         @Override
448         public String code() {
449             return name();
450         }
451 
452     }
453 
454     @VisibleForTesting
455     public static final class Port implements Waypoint {
456         public final String name;
457         public final String code;
458         public final Shapefile visitRegion;
459 
460         Port(String name, String code, Shapefile visitRegion) {
461             this.name = name;
462             this.code = code;
463             this.visitRegion = visitRegion;
464         }
465 
466         @Override
467         public String name() {
468             return name;
469         }
470 
471         @Override
472         public String toString() {
473             return "Port [name=" + name + "]";
474         }
475 
476         @Override
477         public String code() {
478             return code;
479         }
480 
481     }
482 
483     @VisibleForTesting
484     public static final class TimedWaypoint {
485         public final Waypoint waypoint;
486         public final long time;
487 
488         TimedWaypoint(Waypoint waypoint, long time) {
489             this.waypoint = waypoint;
490             this.time = time;
491         }
492 
493         @Override
494         public String toString() {
495             return "TimedWaypoint [waypoint=" + waypoint + ", time=" + time + "]";
496         }
497 
498     }
499 
500     public static void main(String[] args) throws Exception {
501         File output = new File("target/legs.txt");
502         File fixesOutput = new File("/media/an/temp/fixes");
503         List<File> list = new ArrayList<File>();
504 
505         String baseFilename = "/media/an/binary-fixes-5-minute/";
506         Pattern pattern = Pattern.compile(".*\\.track");
507         list.addAll(Files.find(new File(baseFilename + "2014"), pattern));
508         list.addAll(Files.find(new File(baseFilename + "2015"), pattern));
509         list.addAll(Files.find(new File(baseFilename + "2016"), pattern));
510 
511         VoyageDatasetProducer.produce(output, fixesOutput, list);
512     }
513 }