View Javadoc
1   package au.gov.amsa.craft.analyzer.wms;
2   
3   import java.awt.Color;
4   import java.awt.Graphics2D;
5   import java.awt.Point;
6   import java.io.File;
7   import java.io.FileFilter;
8   import java.io.FileNotFoundException;
9   import java.io.FileOutputStream;
10  import java.io.IOException;
11  import java.io.OutputStreamWriter;
12  import java.nio.charset.StandardCharsets;
13  import java.text.DecimalFormat;
14  import java.util.ArrayList;
15  import java.util.Arrays;
16  import java.util.Comparator;
17  import java.util.Date;
18  import java.util.List;
19  import java.util.concurrent.ConcurrentLinkedQueue;
20  import java.util.concurrent.atomic.AtomicLong;
21  import java.util.zip.GZIPOutputStream;
22  
23  import org.joda.time.format.DateTimeFormat;
24  import org.joda.time.format.DateTimeFormatter;
25  import org.slf4j.Logger;
26  import org.slf4j.LoggerFactory;
27  
28  import rx.Observable;
29  import rx.Observer;
30  import rx.Subscriber;
31  import rx.functions.Action0;
32  import rx.functions.Action1;
33  import rx.functions.Func1;
34  import rx.functions.Func2;
35  import rx.schedulers.Schedulers;
36  import au.gov.amsa.ais.LineAndTime;
37  import au.gov.amsa.ais.ShipTypeDecoder;
38  import au.gov.amsa.ais.rx.Streams;
39  import au.gov.amsa.navigation.DriftCandidate;
40  import au.gov.amsa.navigation.DriftDetector;
41  import au.gov.amsa.navigation.VesselClass;
42  import au.gov.amsa.navigation.VesselPosition;
43  import au.gov.amsa.navigation.VesselPosition.NavigationalStatus;
44  import au.gov.amsa.navigation.ais.AisVesselPositions;
45  import au.gov.amsa.navigation.ais.SortOperator;
46  import au.gov.amsa.risky.format.OperatorMinEffectiveSpeedThreshold.FixWithPreAndPostEffectiveSpeed;
47  
48  import com.github.davidmoten.grumpy.core.Position;
49  import com.github.davidmoten.grumpy.projection.Projector;
50  import com.github.davidmoten.grumpy.wms.Layer;
51  import com.github.davidmoten.grumpy.wms.LayerFeatures;
52  import com.github.davidmoten.grumpy.wms.WmsRequest;
53  import com.github.davidmoten.grumpy.wms.WmsUtil;
54  import com.github.davidmoten.rtree.RTree;
55  import com.github.davidmoten.rtree.geometry.Geometries;
56  import com.github.davidmoten.rtree.geometry.Rectangle;
57  import com.github.davidmoten.rx.slf4j.Logging;
58  import com.google.common.base.Optional;
59  import com.google.common.base.Preconditions;
60  
61  public class DriftingLayer implements Layer {
62  
63      private static final int SHIP_TYPE_FISHING = 30;
64      private static final int SHIP_TYPE_DREDGING_OR_UNDERWATER_OPERATIONS = 33;
65      private static final int SHIP_TYPE_TUG = 52;
66      private static final int SHIP_TYPE_MILITARY_OPERATIONS = 35;
67      private static final int SHIP_TYPE_LAW_ENFORCEMENT = 55;
68  
69      private static Logger log = LoggerFactory.getLogger(DriftingLayer.class);
70  
71      private final ConcurrentLinkedQueue<VesselPosition> queue = new ConcurrentLinkedQueue<VesselPosition>();
72  
73      private volatile RTree<VesselPosition, com.github.davidmoten.rtree.geometry.Point> tree = RTree
74              .maxChildren(4).star().create();
75  
76      public DriftingLayer() {
77          log.info("creating Drifting layer");
78          // collect drifting candidates
79          String filename = System.getProperty("drift.candidates", System.getProperty("user.home")
80                  + "/drift-candidates.txt");
81          Sources.fixes2(new File(filename))
82                  // log
83                  .lift(Logging.<VesselPosition> logger().showCount().showMemory().every(10000).log())
84                  // only emit those drifters that have drifted a decent distance
85                  // since start of drift
86                  .lift(new OperatorDriftDistanceCheck())
87                  // only class A vessels
88                  .filter(onlyClassA())
89                  // exclude anchored
90                  .filter(not(atAnchor()))
91                  // exclude moored
92                  .filter(not(isMoored()))
93                  // group by id and date
94                  .distinct(byIdAndTimePattern("yyyy-MM-dd HH"))
95                  // add to queue
96                  .doOnNext(addToQueue())
97                  // run in background
98                  .subscribeOn(Schedulers.io())
99                  // subscribe
100                 .subscribe(createObserver());
101     }
102 
103     private static Func1<FixWithPreAndPostEffectiveSpeed, VesselPosition> toVesselPosition() {
104         return new Func1<FixWithPreAndPostEffectiveSpeed, VesselPosition>() {
105             @Override
106             public VesselPosition call(FixWithPreAndPostEffectiveSpeed f) {
107                 return (VesselPosition) f.fixWrapper();
108             }
109         };
110     }
111 
112     private static Observable<VesselPosition> getDrifters() {
113         return getFilenames()
114         // need to leave a processor spare to process the merged items
115         // and another for gc perhaps
116                 .buffer(Runtime.getRuntime().availableProcessors() - 1)
117                 // convert list to Observable
118                 .map(DriftingLayer.<String> iterableToObservable())
119                 // get positions for each window
120                 .concatMap(detectDrifters());
121     }
122 
123     private static Observable<VesselPosition> getDriftingPositions(Observable<String> filenames) {
124         return filenames
125         // get the positions from each file
126         // use concatMap till merge bug is fixed RxJava
127         // https://github.com/ReactiveX/RxJava/issues/1941
128         // log filename
129                 .lift(Logging.<String> logger().onNextPrefix("loading file=").showValue().log())
130                 // extract positions from file
131                 .flatMap(filenameToDriftCandidates())
132                 // convert back to vessel position
133                 .map(driftCandidateToVesselPosition())
134                 // log
135                 // .lift(Logging.<VesselPosition>logger().log())
136                 // only class A vessels
137                 .filter(onlyClassA())
138                 // ignore vessels at anchor
139                 .filter(not(atAnchor()))
140                 // ignore vessels at moorings
141                 .filter(not(isMoored()))
142                 // ignore vessels that might be fishing
143                 .filter(not(isShipType(SHIP_TYPE_FISHING)))
144                 // ignore vessels that might be dredging
145                 .filter(not(isShipType(SHIP_TYPE_DREDGING_OR_UNDERWATER_OPERATIONS)))
146                 // ignore tugs
147                 .filter(not(isShipType(SHIP_TYPE_TUG)))
148                 // ignore military
149                 .filter(not(isShipType(SHIP_TYPE_MILITARY_OPERATIONS)))
150                 // ignore military
151                 .filter(not(isShipType(SHIP_TYPE_LAW_ENFORCEMENT)))
152                 // is a big vessel
153                 .filter(isBig())
154                 // group by id and date
155                 .distinct(byIdAndTimePattern("yyyy-MM-dd"));
156     }
157 
158     private static Func1<DriftCandidate, VesselPosition> driftCandidateToVesselPosition() {
159         return new Func1<DriftCandidate, VesselPosition>() {
160 
161             @Override
162             public VesselPosition call(DriftCandidate c) {
163                 return (VesselPosition) c.fixWwrapper();
164             }
165         };
166     }
167 
168     private static Func1<VesselPosition, Boolean> isShipType(final int shipType) {
169         return new Func1<VesselPosition, Boolean>() {
170 
171             @Override
172             public Boolean call(VesselPosition vp) {
173                 return vp.shipType().isPresent() && vp.shipType().get() == shipType;
174             }
175         };
176     }
177 
178     public static <T> Func1<T, Boolean> not(final Func1<T, Boolean> f) {
179         return new Func1<T, Boolean>() {
180             @Override
181             public Boolean call(T t) {
182                 return !f.call(t);
183             }
184         };
185     }
186 
187     private static Observable<String> getFilenames() {
188         List<String> filenames = new ArrayList<String>();
189         final String filenameBase = "/media/analysis/nmea/2014/sorted-NMEA_ITU_201407";
190         for (int i = 1; i <= 31; i++) {
191             String filename = filenameBase + new DecimalFormat("00").format(i) + ".gz";
192             if (new File(filename).exists()) {
193                 filenames.add(filename);
194                 log.info("adding filename " + filename);
195             }
196         }
197         return Observable.from(filenames);
198     }
199 
200     private static Func1<VesselPosition, Boolean> isBig() {
201         return new Func1<VesselPosition, Boolean>() {
202             @Override
203             public Boolean call(VesselPosition p) {
204                 return !p.lengthMetres().isPresent() || p.lengthMetres().get() > 50;
205             }
206         };
207     }
208 
209     private static Func1<VesselPosition, Boolean> onlyClassA() {
210         return new Func1<VesselPosition, Boolean>() {
211             @Override
212             public Boolean call(VesselPosition p) {
213                 return p.cls() == VesselClass.A;
214             }
215         };
216     }
217 
218     private static Func1<VesselPosition, Boolean> atAnchor() {
219         return new Func1<VesselPosition, Boolean>() {
220             @Override
221             public Boolean call(VesselPosition p) {
222                 return p.navigationalStatus() == NavigationalStatus.AT_ANCHOR;
223             }
224         };
225     }
226 
227     private static Func1<VesselPosition, Boolean> isMoored() {
228         return new Func1<VesselPosition, Boolean>() {
229             @Override
230             public Boolean call(VesselPosition p) {
231                 return p.navigationalStatus() == NavigationalStatus.MOORED;
232             }
233         };
234     }
235 
236     private static AtomicLong totalCount = new AtomicLong();
237 
238     private static Func1<String, Observable<DriftCandidate>> filenameToDriftCandidates() {
239         return new Func1<String, Observable<DriftCandidate>>() {
240             @Override
241             public Observable<DriftCandidate> call(final String filename) {
242                 return Streams.nmeaFromGzip(filename)
243                 // extract positions
244                         .compose(AisVesselPositions.positions())
245                         // log requests
246                         .doOnRequest(new Action1<Long>() {
247                             @Override
248                             public void call(Long n) {
249                                 // log.info("requested=" + n);
250                             }
251                         }).doOnNext(new Action1<VesselPosition>() {
252                             final long startTime = System.currentTimeMillis();
253                             long lastTime = System.currentTimeMillis();
254                             DecimalFormat df = new DecimalFormat("0");
255 
256                             @Override
257                             public void call(VesselPosition vp) {
258                                 long n = 100000;
259                                 if (totalCount.incrementAndGet() % n == 0) {
260                                     long now = System.currentTimeMillis();
261                                     final double rate;
262                                     if (now == lastTime)
263                                         rate = -1;
264                                     else {
265                                         rate = n / (double) (now - lastTime) * 1000d;
266                                     }
267                                     lastTime = now;
268                                     final double rateSinceStart;
269                                     if (now == startTime)
270                                         rateSinceStart = -1;
271                                     else
272                                         rateSinceStart = totalCount.get()
273                                                 / (double) (now - startTime) * 1000d;
274                                     log.info("totalCount=" + totalCount.get() + ", msgsPerSecond="
275                                             + df.format(rate) + ", msgPerSecondOverall="
276                                             + df.format(rateSinceStart));
277                                 }
278                             }
279                         })
280                         // detect drift
281                         .compose(DriftDetector.detectDrift())
282 
283                         // backpressure strategy - don't
284                         // .onBackpressureBlock()
285                         // in background thread from pool per file
286                         .subscribeOn(Schedulers.computation())
287                         // log completion of read of file
288                         .doOnCompleted(new Action0() {
289                             @Override
290                             public void call() {
291                                 log.info("finished " + filename);
292                             }
293                         });
294             }
295         };
296     }
297 
298     private Observer<VesselPosition> createObserver() {
299         return new Observer<VesselPosition>() {
300 
301             @Override
302             public void onCompleted() {
303                 System.out.println("done");
304             }
305 
306             @Override
307             public void onError(Throwable e) {
308                 log.error(e.getMessage(), e);
309             }
310 
311             @Override
312             public void onNext(VesselPosition t) {
313                 // do nothing
314             }
315         };
316     }
317 
318     private Action1<VesselPosition> addToQueue() {
319         return new Action1<VesselPosition>() {
320 
321             @Override
322             public void call(VesselPosition p) {
323                 // System.out.println(p.lat() + "\t" + p.lon() + "\t"
324                 // + p.id().uniqueId());
325                 if (queue.size() % 10000 == 0)
326                     System.out.println("queue.size=" + queue.size());
327                 queue.add(p);
328                 tree = tree.add(p, Geometries.point(p.lon(), p.lat()));
329             }
330         };
331     }
332 
333     private static Func1<VesselPosition, String> byIdAndTimePattern(final String timePattern) {
334         return new Func1<VesselPosition, String>() {
335             final DateTimeFormatter format = DateTimeFormat.forPattern(timePattern);
336 
337             @Override
338             public String call(VesselPosition p) {
339                 return p.id().uniqueId() + format.print(p.time());
340             }
341         };
342     }
343 
344     public static final Func2<VesselPosition, VesselPosition, Integer> SORT_BY_TIME = new Func2<VesselPosition, VesselPosition, Integer>() {
345 
346         @Override
347         public Integer call(VesselPosition p1, VesselPosition p2) {
348             return ((Long) p1.time()).compareTo(p2.time());
349         }
350     };
351 
352     @Override
353     public LayerFeatures getFeatures() {
354         return LayerFeatures.builder().crs("EPSG:4326").crs("EPSG:3857").name("Drifting")
355                 .queryable().build();
356     }
357 
358     @Override
359     public String getInfo(Date time, WmsRequest request, final Point point, String mimeType) {
360 
361         final int HOTSPOT_SIZE = 5;
362 
363         final Projector projector = WmsUtil.getProjector(request);
364         final StringBuilder response = new StringBuilder();
365         response.append("<html>");
366         Observable.from(queue)
367         // only vessel positions close to the click point
368                 .filter(new Func1<VesselPosition, Boolean>() {
369                     @Override
370                     public Boolean call(VesselPosition p) {
371                         Point pt = projector.toPoint(p.lat(), p.lon());
372                         return Math.abs(point.x - pt.x) <= HOTSPOT_SIZE
373                                 && Math.abs(point.y - pt.y) <= HOTSPOT_SIZE;
374                     }
375                 })
376                 // add html fragment for each vessel position to the response
377                 .doOnNext(new Action1<VesselPosition>() {
378                     @Override
379                     public void call(VesselPosition p) {
380                         response.append("<p>");
381                         response.append("<a href=\"https://www.fleetmon.com/en/vessels?s="
382                                 + p.id().uniqueId() + "\">mmsi=" + p.id().uniqueId()
383                                 + "</a>, time=" + new Date(p.time()));
384                         if (p.shipType().isPresent()) {
385                             response.append(", ");
386                             response.append(ShipTypeDecoder.getShipType(p.shipType().get()));
387                         }
388                         response.append("</p>");
389                         response.append("<p>");
390                         response.append(p.toString());
391                         response.append("</p>");
392                     }
393                 })
394                 // go!
395                 .subscribe();
396         response.append("</html>");
397         return response.toString();
398     }
399 
400     @Override
401     public void render(Graphics2D g, WmsRequest request) {
402         log.info("request=" + request);
403         log.info("drawing " + queue.size() + " positions");
404         final Projector projector = WmsUtil.getProjector(request);
405         Position a = projector.toPosition(0, 0);
406         Position b = projector.toPosition(request.getWidth(), request.getHeight());
407         Rectangle r = Geometries.rectangle(a.getLon(), b.getLat(), b.getLon(), a.getLat());
408 
409         Optional<VesselPosition> last = Optional.absent();
410         Optional<Point> lastPoint = Optional.absent();
411         // Iterable<VesselPosition> positions = tree
412         // .search(r)
413         // .map(new Func1<Entry<VesselPosition,
414         // com.github.davidmoten.rtree.geometry.Point>, VesselPosition>() {
415         //
416         // @Override
417         // public VesselPosition call(
418         // Entry<VesselPosition, com.github.davidmoten.rtree.geometry.Point>
419         // entry) {
420         // return entry.value();
421         // }
422         //
423         // }).toBlocking().toIterable();
424         ConcurrentLinkedQueue<VesselPosition> positions = queue;
425         Point startPoint = null;
426         for (VesselPosition p : positions) {
427             // expecting positions to be in mmsi, time order
428             Point point = projector.toPoint(p.lat(), p.lon());
429             if (last.isPresent() && p.id().equals(last.get().id()) && p.data().isPresent()
430                     && !p.data().get().equals(p.time()) && isOkMovement(p, last.get())) {
431                 // join the last position with this one with a line
432                 g.setColor(Color.gray);
433                 g.drawLine(lastPoint.get().x, lastPoint.get().y, point.x, point.y);
434             }
435             if (p.data().get().equals(p.time())
436                     || (last.isPresent() && !isOkMovement(p, last.get()))) {
437                 g.setColor(Color.red);
438                 g.drawRect(point.x, point.y, 1, 1);
439                 startPoint = point;
440             } else if (startPoint != null) {
441                 // draw intermediate point
442                 g.setColor(Color.darkGray);
443                 g.drawRect(point.x, point.y, 1, 1);
444                 // redraw startPoint so that a slightly moving drift doesn't
445                 // overdraw the startPoint with the color of an intermediate
446                 // point
447                 g.setColor(Color.red);
448                 g.drawRect(startPoint.x, startPoint.y, 1, 1);
449             }
450             last = Optional.of(p);
451             lastPoint = Optional.of(point);
452 
453         }
454         log.info("drawn");
455 
456     }
457 
458     private static boolean isOkMovement(VesselPosition current, VesselPosition last) {
459         return Position.create(current.lat(), current.lon()).getDistanceToKm(
460                 Position.create(last.lat(), last.lon())) < 15;
461     }
462 
463     private static void sortFile(String filename) throws FileNotFoundException, IOException {
464         Comparator<LineAndTime> comparator = new Comparator<LineAndTime>() {
465             @Override
466             public int compare(LineAndTime line1, LineAndTime line2) {
467                 return ((Long) line1.getTime()).compareTo(line2.getTime());
468             }
469         };
470         final File in = new File(filename);
471         final File outFile = new File(in.getParentFile(), "sorted-" + in.getName());
472         if (outFile.exists()) {
473             log.info("file exists: " + outFile);
474             return;
475         }
476         final OutputStreamWriter out = new OutputStreamWriter(new GZIPOutputStream(
477                 new FileOutputStream(outFile)), StandardCharsets.UTF_8);
478 
479         Streams
480         // read from file
481         .nmeaFromGzip(filename)
482         // get time
483                 .flatMap(Streams.toLineAndTime())
484                 // sort
485                 .lift(new SortOperator<LineAndTime>(comparator, 20000000))
486                 // .lift(Logging.<LineAndTime> logger().showValue().log())
487                 .doOnCompleted(new Action0() {
488                     @Override
489                     public void call() {
490                         try {
491                             out.close();
492                         } catch (IOException e) {
493                         }
494                     }
495                 }).forEach(new Action1<LineAndTime>() {
496                     @Override
497                     public void call(LineAndTime line) {
498                         try {
499                             out.write(line.getLine());
500                             out.write('\n');
501                         } catch (IOException e) {
502                             throw new RuntimeException(e);
503                         }
504                     }
505                 });
506     }
507 
508     private static void sortFiles() throws FileNotFoundException, IOException {
509         // String filename = "/media/analysis/nmea/2014/NMEA_ITU_20140701.gz";
510         File directory = new File("/media/analysis/nmea/2014");
511         Preconditions.checkArgument(directory.exists());
512         File[] files = directory.listFiles(new FileFilter() {
513             @Override
514             public boolean accept(File f) {
515                 return f.getName().startsWith("NMEA_") && f.getName().endsWith(".gz");
516             }
517         });
518         int count = 0;
519 
520         Arrays.sort(files, new Comparator<File>() {
521             @Override
522             public int compare(File f1, File f2) {
523                 return f1.getPath().compareTo(f2.getPath());
524             }
525         });
526 
527         for (File file : files) {
528             count++;
529             log.info("sorting " + count + " of " + files.length + ": " + file);
530             sortFile(file.getAbsolutePath());
531         }
532     }
533 
534     private static <T> Func1<Iterable<T>, Observable<T>> iterableToObservable() {
535         return new Func1<Iterable<T>, Observable<T>>() {
536             @Override
537             public Observable<T> call(Iterable<T> iterable) {
538                 return Observable.from(iterable);
539             }
540         };
541     }
542 
543     private static Func1<Observable<String>, Observable<VesselPosition>> detectDrifters() {
544         return new Func1<Observable<String>, Observable<VesselPosition>>() {
545             @Override
546             public Observable<VesselPosition> call(Observable<String> filenames) {
547                 return getDriftingPositions(filenames);
548             }
549         };
550     }
551 
552     public static void main(String[] args) throws FileNotFoundException, IOException,
553             InterruptedException {
554 
555         getDrifters()
556         // log
557                 .lift(Logging.<VesselPosition> logger().showCount()
558                         .showRateSinceStart("msgPerSecond").showMemory().every(5000).log())
559                 // subscribe
560                 .subscribe(new Subscriber<VesselPosition>() {
561 
562                     @Override
563                     public void onStart() {
564                     }
565 
566                     @Override
567                     public void onCompleted() {
568                         // TODO Auto-generated method stub
569 
570                     }
571 
572                     @Override
573                     public void onError(Throwable e) {
574                         log.error(e.getMessage(), e);
575                         throw new RuntimeException(e);
576                     }
577 
578                     @Override
579                     public void onNext(VesselPosition vp) {
580                         if (vp.shipType().isPresent() && false) {
581                             System.out.println(vp.id() + "," + vp.shipType() + ","
582                                     + ShipTypeDecoder.getShipType(vp.shipType().get())
583                                     + ", length=" + vp.lengthMetres() + ", cog=" + vp.cogDegrees()
584                                     + ", heading=" + vp.headingDegrees() + ", speedKnots="
585                                     + (vp.speedMetresPerSecond().get() / 1852.0 * 3600));
586                         }
587                     }
588                 });
589 
590         Thread.sleep(10000000);
591     }
592 
593 }