1 package au.gov.amsa.craft.analyzer.wms;
2
3 import java.awt.Color;
4 import java.awt.Graphics2D;
5 import java.awt.Point;
6 import java.io.File;
7 import java.io.FileFilter;
8 import java.io.FileNotFoundException;
9 import java.io.FileOutputStream;
10 import java.io.IOException;
11 import java.io.OutputStreamWriter;
12 import java.nio.charset.StandardCharsets;
13 import java.text.DecimalFormat;
14 import java.util.ArrayList;
15 import java.util.Arrays;
16 import java.util.Comparator;
17 import java.util.Date;
18 import java.util.List;
19 import java.util.concurrent.ConcurrentLinkedQueue;
20 import java.util.concurrent.atomic.AtomicLong;
21 import java.util.zip.GZIPOutputStream;
22
23 import org.joda.time.format.DateTimeFormat;
24 import org.joda.time.format.DateTimeFormatter;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 import rx.Observable;
29 import rx.Observer;
30 import rx.Subscriber;
31 import rx.functions.Action0;
32 import rx.functions.Action1;
33 import rx.functions.Func1;
34 import rx.functions.Func2;
35 import rx.schedulers.Schedulers;
36 import au.gov.amsa.ais.LineAndTime;
37 import au.gov.amsa.ais.ShipTypeDecoder;
38 import au.gov.amsa.ais.rx.Streams;
39 import au.gov.amsa.navigation.DriftCandidate;
40 import au.gov.amsa.navigation.DriftDetector;
41 import au.gov.amsa.navigation.VesselClass;
42 import au.gov.amsa.navigation.VesselPosition;
43 import au.gov.amsa.navigation.VesselPosition.NavigationalStatus;
44 import au.gov.amsa.navigation.ais.AisVesselPositions;
45 import au.gov.amsa.navigation.ais.SortOperator;
46 import au.gov.amsa.risky.format.OperatorMinEffectiveSpeedThreshold.FixWithPreAndPostEffectiveSpeed;
47
48 import com.github.davidmoten.grumpy.core.Position;
49 import com.github.davidmoten.grumpy.projection.Projector;
50 import com.github.davidmoten.grumpy.wms.Layer;
51 import com.github.davidmoten.grumpy.wms.LayerFeatures;
52 import com.github.davidmoten.grumpy.wms.WmsRequest;
53 import com.github.davidmoten.grumpy.wms.WmsUtil;
54 import com.github.davidmoten.rtree.RTree;
55 import com.github.davidmoten.rtree.geometry.Geometries;
56 import com.github.davidmoten.rtree.geometry.Rectangle;
57 import com.github.davidmoten.rx.slf4j.Logging;
58 import com.google.common.base.Optional;
59 import com.google.common.base.Preconditions;
60
61 public class DriftingLayer implements Layer {
62
63 private static final int SHIP_TYPE_FISHING = 30;
64 private static final int SHIP_TYPE_DREDGING_OR_UNDERWATER_OPERATIONS = 33;
65 private static final int SHIP_TYPE_TUG = 52;
66 private static final int SHIP_TYPE_MILITARY_OPERATIONS = 35;
67 private static final int SHIP_TYPE_LAW_ENFORCEMENT = 55;
68
69 private static Logger log = LoggerFactory.getLogger(DriftingLayer.class);
70
71 private final ConcurrentLinkedQueue<VesselPosition> queue = new ConcurrentLinkedQueue<VesselPosition>();
72
73 private volatile RTree<VesselPosition, com.github.davidmoten.rtree.geometry.Point> tree = RTree
74 .maxChildren(4).star().create();
75
76 public DriftingLayer() {
77 log.info("creating Drifting layer");
78
79 String filename = System.getProperty("drift.candidates", System.getProperty("user.home")
80 + "/drift-candidates.txt");
81 Sources.fixes2(new File(filename))
82
83 .lift(Logging.<VesselPosition> logger().showCount().showMemory().every(10000).log())
84
85
86 .lift(new OperatorDriftDistanceCheck())
87
88 .filter(onlyClassA())
89
90 .filter(not(atAnchor()))
91
92 .filter(not(isMoored()))
93
94 .distinct(byIdAndTimePattern("yyyy-MM-dd HH"))
95
96 .doOnNext(addToQueue())
97
98 .subscribeOn(Schedulers.io())
99
100 .subscribe(createObserver());
101 }
102
103 private static Func1<FixWithPreAndPostEffectiveSpeed, VesselPosition> toVesselPosition() {
104 return new Func1<FixWithPreAndPostEffectiveSpeed, VesselPosition>() {
105 @Override
106 public VesselPosition call(FixWithPreAndPostEffectiveSpeed f) {
107 return (VesselPosition) f.fixWrapper();
108 }
109 };
110 }
111
112 private static Observable<VesselPosition> getDrifters() {
113 return getFilenames()
114
115
116 .buffer(Runtime.getRuntime().availableProcessors() - 1)
117
118 .map(DriftingLayer.<String> iterableToObservable())
119
120 .concatMap(detectDrifters());
121 }
122
123 private static Observable<VesselPosition> getDriftingPositions(Observable<String> filenames) {
124 return filenames
125
126
127
128
129 .lift(Logging.<String> logger().onNextPrefix("loading file=").showValue().log())
130
131 .flatMap(filenameToDriftCandidates())
132
133 .map(driftCandidateToVesselPosition())
134
135
136
137 .filter(onlyClassA())
138
139 .filter(not(atAnchor()))
140
141 .filter(not(isMoored()))
142
143 .filter(not(isShipType(SHIP_TYPE_FISHING)))
144
145 .filter(not(isShipType(SHIP_TYPE_DREDGING_OR_UNDERWATER_OPERATIONS)))
146
147 .filter(not(isShipType(SHIP_TYPE_TUG)))
148
149 .filter(not(isShipType(SHIP_TYPE_MILITARY_OPERATIONS)))
150
151 .filter(not(isShipType(SHIP_TYPE_LAW_ENFORCEMENT)))
152
153 .filter(isBig())
154
155 .distinct(byIdAndTimePattern("yyyy-MM-dd"));
156 }
157
158 private static Func1<DriftCandidate, VesselPosition> driftCandidateToVesselPosition() {
159 return new Func1<DriftCandidate, VesselPosition>() {
160
161 @Override
162 public VesselPosition call(DriftCandidate c) {
163 return (VesselPosition) c.fixWwrapper();
164 }
165 };
166 }
167
168 private static Func1<VesselPosition, Boolean> isShipType(final int shipType) {
169 return new Func1<VesselPosition, Boolean>() {
170
171 @Override
172 public Boolean call(VesselPosition vp) {
173 return vp.shipType().isPresent() && vp.shipType().get() == shipType;
174 }
175 };
176 }
177
178 public static <T> Func1<T, Boolean> not(final Func1<T, Boolean> f) {
179 return new Func1<T, Boolean>() {
180 @Override
181 public Boolean call(T t) {
182 return !f.call(t);
183 }
184 };
185 }
186
187 private static Observable<String> getFilenames() {
188 List<String> filenames = new ArrayList<String>();
189 final String filenameBase = "/media/analysis/nmea/2014/sorted-NMEA_ITU_201407";
190 for (int i = 1; i <= 31; i++) {
191 String filename = filenameBase + new DecimalFormat("00").format(i) + ".gz";
192 if (new File(filename).exists()) {
193 filenames.add(filename);
194 log.info("adding filename " + filename);
195 }
196 }
197 return Observable.from(filenames);
198 }
199
200 private static Func1<VesselPosition, Boolean> isBig() {
201 return new Func1<VesselPosition, Boolean>() {
202 @Override
203 public Boolean call(VesselPosition p) {
204 return !p.lengthMetres().isPresent() || p.lengthMetres().get() > 50;
205 }
206 };
207 }
208
209 private static Func1<VesselPosition, Boolean> onlyClassA() {
210 return new Func1<VesselPosition, Boolean>() {
211 @Override
212 public Boolean call(VesselPosition p) {
213 return p.cls() == VesselClass.A;
214 }
215 };
216 }
217
218 private static Func1<VesselPosition, Boolean> atAnchor() {
219 return new Func1<VesselPosition, Boolean>() {
220 @Override
221 public Boolean call(VesselPosition p) {
222 return p.navigationalStatus() == NavigationalStatus.AT_ANCHOR;
223 }
224 };
225 }
226
227 private static Func1<VesselPosition, Boolean> isMoored() {
228 return new Func1<VesselPosition, Boolean>() {
229 @Override
230 public Boolean call(VesselPosition p) {
231 return p.navigationalStatus() == NavigationalStatus.MOORED;
232 }
233 };
234 }
235
236 private static AtomicLong totalCount = new AtomicLong();
237
238 private static Func1<String, Observable<DriftCandidate>> filenameToDriftCandidates() {
239 return new Func1<String, Observable<DriftCandidate>>() {
240 @Override
241 public Observable<DriftCandidate> call(final String filename) {
242 return Streams.nmeaFromGzip(filename)
243
244 .compose(AisVesselPositions.positions())
245
246 .doOnRequest(new Action1<Long>() {
247 @Override
248 public void call(Long n) {
249
250 }
251 }).doOnNext(new Action1<VesselPosition>() {
252 final long startTime = System.currentTimeMillis();
253 long lastTime = System.currentTimeMillis();
254 DecimalFormat df = new DecimalFormat("0");
255
256 @Override
257 public void call(VesselPosition vp) {
258 long n = 100000;
259 if (totalCount.incrementAndGet() % n == 0) {
260 long now = System.currentTimeMillis();
261 final double rate;
262 if (now == lastTime)
263 rate = -1;
264 else {
265 rate = n / (double) (now - lastTime) * 1000d;
266 }
267 lastTime = now;
268 final double rateSinceStart;
269 if (now == startTime)
270 rateSinceStart = -1;
271 else
272 rateSinceStart = totalCount.get()
273 / (double) (now - startTime) * 1000d;
274 log.info("totalCount=" + totalCount.get() + ", msgsPerSecond="
275 + df.format(rate) + ", msgPerSecondOverall="
276 + df.format(rateSinceStart));
277 }
278 }
279 })
280
281 .compose(DriftDetector.detectDrift())
282
283
284
285
286 .subscribeOn(Schedulers.computation())
287
288 .doOnCompleted(new Action0() {
289 @Override
290 public void call() {
291 log.info("finished " + filename);
292 }
293 });
294 }
295 };
296 }
297
298 private Observer<VesselPosition> createObserver() {
299 return new Observer<VesselPosition>() {
300
301 @Override
302 public void onCompleted() {
303 System.out.println("done");
304 }
305
306 @Override
307 public void onError(Throwable e) {
308 log.error(e.getMessage(), e);
309 }
310
311 @Override
312 public void onNext(VesselPosition t) {
313
314 }
315 };
316 }
317
318 private Action1<VesselPosition> addToQueue() {
319 return new Action1<VesselPosition>() {
320
321 @Override
322 public void call(VesselPosition p) {
323
324
325 if (queue.size() % 10000 == 0)
326 System.out.println("queue.size=" + queue.size());
327 queue.add(p);
328 tree = tree.add(p, Geometries.point(p.lon(), p.lat()));
329 }
330 };
331 }
332
333 private static Func1<VesselPosition, String> byIdAndTimePattern(final String timePattern) {
334 return new Func1<VesselPosition, String>() {
335 final DateTimeFormatter format = DateTimeFormat.forPattern(timePattern);
336
337 @Override
338 public String call(VesselPosition p) {
339 return p.id().uniqueId() + format.print(p.time());
340 }
341 };
342 }
343
344 public static final Func2<VesselPosition, VesselPosition, Integer> SORT_BY_TIME = new Func2<VesselPosition, VesselPosition, Integer>() {
345
346 @Override
347 public Integer call(VesselPosition p1, VesselPosition p2) {
348 return ((Long) p1.time()).compareTo(p2.time());
349 }
350 };
351
352 @Override
353 public LayerFeatures getFeatures() {
354 return LayerFeatures.builder().crs("EPSG:4326").crs("EPSG:3857").name("Drifting")
355 .queryable().build();
356 }
357
358 @Override
359 public String getInfo(Date time, WmsRequest request, final Point point, String mimeType) {
360
361 final int HOTSPOT_SIZE = 5;
362
363 final Projector projector = WmsUtil.getProjector(request);
364 final StringBuilder response = new StringBuilder();
365 response.append("<html>");
366 Observable.from(queue)
367
368 .filter(new Func1<VesselPosition, Boolean>() {
369 @Override
370 public Boolean call(VesselPosition p) {
371 Point pt = projector.toPoint(p.lat(), p.lon());
372 return Math.abs(point.x - pt.x) <= HOTSPOT_SIZE
373 && Math.abs(point.y - pt.y) <= HOTSPOT_SIZE;
374 }
375 })
376
377 .doOnNext(new Action1<VesselPosition>() {
378 @Override
379 public void call(VesselPosition p) {
380 response.append("<p>");
381 response.append("<a href=\"https://www.fleetmon.com/en/vessels?s="
382 + p.id().uniqueId() + "\">mmsi=" + p.id().uniqueId()
383 + "</a>, time=" + new Date(p.time()));
384 if (p.shipType().isPresent()) {
385 response.append(", ");
386 response.append(ShipTypeDecoder.getShipType(p.shipType().get()));
387 }
388 response.append("</p>");
389 response.append("<p>");
390 response.append(p.toString());
391 response.append("</p>");
392 }
393 })
394
395 .subscribe();
396 response.append("</html>");
397 return response.toString();
398 }
399
400 @Override
401 public void render(Graphics2D g, WmsRequest request) {
402 log.info("request=" + request);
403 log.info("drawing " + queue.size() + " positions");
404 final Projector projector = WmsUtil.getProjector(request);
405 Position a = projector.toPosition(0, 0);
406 Position b = projector.toPosition(request.getWidth(), request.getHeight());
407 Rectangle r = Geometries.rectangle(a.getLon(), b.getLat(), b.getLon(), a.getLat());
408
409 Optional<VesselPosition> last = Optional.absent();
410 Optional<Point> lastPoint = Optional.absent();
411
412
413
414
415
416
417
418
419
420
421
422
423
424 ConcurrentLinkedQueue<VesselPosition> positions = queue;
425 Point startPoint = null;
426 for (VesselPosition p : positions) {
427
428 Point point = projector.toPoint(p.lat(), p.lon());
429 if (last.isPresent() && p.id().equals(last.get().id()) && p.data().isPresent()
430 && !p.data().get().equals(p.time()) && isOkMovement(p, last.get())) {
431
432 g.setColor(Color.gray);
433 g.drawLine(lastPoint.get().x, lastPoint.get().y, point.x, point.y);
434 }
435 if (p.data().get().equals(p.time())
436 || (last.isPresent() && !isOkMovement(p, last.get()))) {
437 g.setColor(Color.red);
438 g.drawRect(point.x, point.y, 1, 1);
439 startPoint = point;
440 } else if (startPoint != null) {
441
442 g.setColor(Color.darkGray);
443 g.drawRect(point.x, point.y, 1, 1);
444
445
446
447 g.setColor(Color.red);
448 g.drawRect(startPoint.x, startPoint.y, 1, 1);
449 }
450 last = Optional.of(p);
451 lastPoint = Optional.of(point);
452
453 }
454 log.info("drawn");
455
456 }
457
458 private static boolean isOkMovement(VesselPosition current, VesselPosition last) {
459 return Position.create(current.lat(), current.lon()).getDistanceToKm(
460 Position.create(last.lat(), last.lon())) < 15;
461 }
462
463 private static void sortFile(String filename) throws FileNotFoundException, IOException {
464 Comparator<LineAndTime> comparator = new Comparator<LineAndTime>() {
465 @Override
466 public int compare(LineAndTime line1, LineAndTime line2) {
467 return ((Long) line1.getTime()).compareTo(line2.getTime());
468 }
469 };
470 final File in = new File(filename);
471 final File outFile = new File(in.getParentFile(), "sorted-" + in.getName());
472 if (outFile.exists()) {
473 log.info("file exists: " + outFile);
474 return;
475 }
476 final OutputStreamWriter out = new OutputStreamWriter(new GZIPOutputStream(
477 new FileOutputStream(outFile)), StandardCharsets.UTF_8);
478
479 Streams
480
481 .nmeaFromGzip(filename)
482
483 .flatMap(Streams.toLineAndTime())
484
485 .lift(new SortOperator<LineAndTime>(comparator, 20000000))
486
487 .doOnCompleted(new Action0() {
488 @Override
489 public void call() {
490 try {
491 out.close();
492 } catch (IOException e) {
493 }
494 }
495 }).forEach(new Action1<LineAndTime>() {
496 @Override
497 public void call(LineAndTime line) {
498 try {
499 out.write(line.getLine());
500 out.write('\n');
501 } catch (IOException e) {
502 throw new RuntimeException(e);
503 }
504 }
505 });
506 }
507
508 private static void sortFiles() throws FileNotFoundException, IOException {
509
510 File directory = new File("/media/analysis/nmea/2014");
511 Preconditions.checkArgument(directory.exists());
512 File[] files = directory.listFiles(new FileFilter() {
513 @Override
514 public boolean accept(File f) {
515 return f.getName().startsWith("NMEA_") && f.getName().endsWith(".gz");
516 }
517 });
518 int count = 0;
519
520 Arrays.sort(files, new Comparator<File>() {
521 @Override
522 public int compare(File f1, File f2) {
523 return f1.getPath().compareTo(f2.getPath());
524 }
525 });
526
527 for (File file : files) {
528 count++;
529 log.info("sorting " + count + " of " + files.length + ": " + file);
530 sortFile(file.getAbsolutePath());
531 }
532 }
533
534 private static <T> Func1<Iterable<T>, Observable<T>> iterableToObservable() {
535 return new Func1<Iterable<T>, Observable<T>>() {
536 @Override
537 public Observable<T> call(Iterable<T> iterable) {
538 return Observable.from(iterable);
539 }
540 };
541 }
542
543 private static Func1<Observable<String>, Observable<VesselPosition>> detectDrifters() {
544 return new Func1<Observable<String>, Observable<VesselPosition>>() {
545 @Override
546 public Observable<VesselPosition> call(Observable<String> filenames) {
547 return getDriftingPositions(filenames);
548 }
549 };
550 }
551
552 public static void main(String[] args) throws FileNotFoundException, IOException,
553 InterruptedException {
554
555 getDrifters()
556
557 .lift(Logging.<VesselPosition> logger().showCount()
558 .showRateSinceStart("msgPerSecond").showMemory().every(5000).log())
559
560 .subscribe(new Subscriber<VesselPosition>() {
561
562 @Override
563 public void onStart() {
564 }
565
566 @Override
567 public void onCompleted() {
568
569
570 }
571
572 @Override
573 public void onError(Throwable e) {
574 log.error(e.getMessage(), e);
575 throw new RuntimeException(e);
576 }
577
578 @Override
579 public void onNext(VesselPosition vp) {
580 if (vp.shipType().isPresent() && false) {
581 System.out.println(vp.id() + "," + vp.shipType() + ","
582 + ShipTypeDecoder.getShipType(vp.shipType().get())
583 + ", length=" + vp.lengthMetres() + ", cog=" + vp.cogDegrees()
584 + ", heading=" + vp.headingDegrees() + ", speedKnots="
585 + (vp.speedMetresPerSecond().get() / 1852.0 * 3600));
586 }
587 }
588 });
589
590 Thread.sleep(10000000);
591 }
592
593 }