1 package au.gov.amsa.ais.rx;
2
3 import static com.google.common.base.Optional.absent;
4 import static com.google.common.base.Optional.of;
5
6 import java.io.BufferedInputStream;
7 import java.io.BufferedReader;
8 import java.io.File;
9 import java.io.FileInputStream;
10 import java.io.IOException;
11 import java.io.InputStream;
12 import java.io.InputStreamReader;
13 import java.io.PrintStream;
14 import java.io.Reader;
15 import java.net.Socket;
16 import java.net.UnknownHostException;
17 import java.nio.charset.Charset;
18 import java.util.Collections;
19 import java.util.List;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicBoolean;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.function.Function;
24 import java.util.regex.Pattern;
25 import java.util.stream.Collectors;
26 import java.util.zip.GZIPInputStream;
27
28 import org.apache.commons.io.FileUtils;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import com.github.davidmoten.rx.Checked;
33 import com.github.davidmoten.rx.slf4j.Logging;
34 import com.google.common.base.Optional;
35
36 import au.gov.amsa.ais.AisMessage;
37 import au.gov.amsa.ais.AisNmeaBuffer;
38 import au.gov.amsa.ais.AisNmeaMessage;
39 import au.gov.amsa.ais.AisParseException;
40 import au.gov.amsa.ais.LineAndTime;
41 import au.gov.amsa.ais.Timestamped;
42 import au.gov.amsa.ais.message.AisPosition;
43 import au.gov.amsa.ais.message.AisPositionA;
44 import au.gov.amsa.risky.format.AisClass;
45 import au.gov.amsa.risky.format.BinaryFixes;
46 import au.gov.amsa.risky.format.BinaryFixesFormat;
47 import au.gov.amsa.risky.format.BinaryFixesWriter;
48 import au.gov.amsa.risky.format.Fix;
49 import au.gov.amsa.risky.format.FixImpl;
50 import au.gov.amsa.risky.format.NavigationalStatus;
51 import au.gov.amsa.streams.Strings;
52 import au.gov.amsa.util.Files;
53 import au.gov.amsa.util.nmea.NmeaMessage;
54 import au.gov.amsa.util.nmea.NmeaMessageParseException;
55 import au.gov.amsa.util.nmea.NmeaUtil;
56 import rx.Observable;
57 import rx.Observable.OnSubscribe;
58 import rx.Observable.Transformer;
59 import rx.Observer;
60 import rx.Scheduler;
61 import rx.Subscriber;
62 import rx.Subscription;
63 import rx.functions.Action1;
64 import rx.functions.Func0;
65 import rx.functions.Func1;
66 import rx.functions.Func2;
67
68 public class Streams {
69
70 public static final int BUFFER_SIZE = 100;
71 private static final Charset UTF8 = Charset.forName("UTF-8");
72
73 private static Logger log = LoggerFactory.getLogger(Streams.class);
74
75 public static Observable<String> connect(String host, int port) {
76 return connect(new HostPort(host, port));
77 }
78
79 private static Observable<String> connect(HostPort socket) {
80 return connectOnce(socket).timeout(1, TimeUnit.MINUTES).retry();
81 }
82
83 public static Observable<TimestampedAndLine<AisMessage>> connectAndExtract(String host,
84 int port) {
85 return extract(connect(host, port));
86 }
87
88 public static Observable<TimestampedAndLine<AisMessage>> extract(
89 Observable<String> rawAisNmea) {
90 return rawAisNmea
91
92 .map(LINE_TO_NMEA_MESSAGE)
93
94 .compose(Streams.<NmeaMessage> valueIfPresent())
95
96 .compose(aggregateMultiLineNmea(BUFFER_SIZE))
97
98 .map(TO_AIS_MESSAGE_AND_LINE);
99 }
100
101 public static Observable<TimestampedAndLines<AisMessage>> extractWithLines(
102 Observable<String> rawAisNmea) {
103 return rawAisNmea
104
105 .map(LINE_TO_NMEA_MESSAGE)
106
107 .compose(Streams.<NmeaMessage> valueIfPresent())
108
109 .compose(addToBuffer(BUFFER_SIZE))
110
111 .map(TO_AIS_MESSAGE_AND_LINES);
112 }
113
114 public static Observable<Timestamped<AisMessage>> extractMessages(
115 Observable<String> rawAisNmea) {
116 return rawAisNmea.map(LINE_TO_NMEA_MESSAGE)
117
118 .compose(Streams.<NmeaMessage> valueIfPresent())
119
120 .compose(aggregateMultiLineNmea(BUFFER_SIZE))
121
122 .map(TO_AIS_MESSAGE)
123
124 .compose(Streams.<Timestamped<AisMessage>> valueIfPresent());
125 }
126
127 public static <T> Func1<Optional<T>, Boolean> isPresent() {
128 return x -> x.isPresent();
129 }
130
131 public static <T> Func1<Optional<T>, T> toValue() {
132 return x -> x.get();
133
134 }
135
136 public static <T> Transformer<Optional<T>, T> valueIfPresent() {
137 return o -> o.filter(Streams.<T> isPresent()).map(Streams.<T> toValue());
138 }
139
140 public static Observable<Fix> extractFixes(Observable<String> rawAisNmea) {
141 return extractMessages(rawAisNmea).flatMap(TO_FIX, 1);
142 }
143
144 private static final Func1<Timestamped<AisMessage>, Observable<Fix>> TO_FIX = m -> {
145 try {
146 if (m.message() instanceof AisPosition) {
147 AisPosition a = (AisPosition) m.message();
148 if (a.getLatitude() == null || a.getLongitude() == null || a.getLatitude() < -90
149 || a.getLatitude() > 90 || a.getLongitude() < -180
150 || a.getLongitude() > 180)
151 return Observable.empty();
152 else {
153 Optional<NavigationalStatus> nav;
154 if (a instanceof AisPositionA) {
155 AisPositionA p = (AisPositionA) a;
156 nav = of(NavigationalStatus.values()[p.getNavigationalStatus().ordinal()]);
157 } else
158 nav = absent();
159
160 Optional<Float> sog;
161 if (a.getSpeedOverGroundKnots() == null)
162 sog = absent();
163 else
164 sog = of((a.getSpeedOverGroundKnots().floatValue()));
165 Optional<Float> cog;
166 if (a.getCourseOverGround() == null || a.getCourseOverGround() >= 360
167 || a.getCourseOverGround() < 0)
168 cog = absent();
169 else
170 cog = of((a.getCourseOverGround().floatValue()));
171 Optional<Float> heading;
172 if (a.getTrueHeading() == null || a.getTrueHeading() >= 360
173 || a.getTrueHeading() < 0)
174 heading = absent();
175 else
176 heading = of((a.getTrueHeading().floatValue()));
177
178 AisClass aisClass;
179 if (a instanceof AisPositionA)
180 aisClass = AisClass.A;
181 else
182 aisClass = AisClass.B;
183 Optional<Short> src;
184 if (a.getSource() != null) {
185
186 src = of((short) BinaryFixes.SOURCE_PRESENT_BUT_UNKNOWN);
187 } else
188 src = absent();
189
190
191 Optional<Integer> latency = absent();
192
193 Fix f = new FixImpl(a.getMmsi(), a.getLatitude().floatValue(),
194 a.getLongitude().floatValue(), m.time(), latency, src, nav, sog, cog,
195 heading, aisClass);
196 return Observable.just(f);
197 }
198 } else
199 return Observable.empty();
200 } catch (RuntimeException e) {
201 log.warn(e.getMessage(), e);
202 return Observable.empty();
203 }
204 };
205
206 public static Observable<String> nmeaFrom(final File file) {
207 return Observable.using(
208
209 Checked.f0(() -> new FileInputStream(file)), is -> nmeaFrom(is),
210
211 is -> {
212 try {
213 is.close();
214 } catch (IOException e) {
215
216 }
217 } , true);
218 }
219
220 public static Observable<String> nmeaFrom(InputStream is) {
221 return Strings.split(Strings.from(new InputStreamReader(is, UTF8)), "\n");
222 }
223
224 public static Observable<String> nmeaFromGzip(String filename) {
225 return nmeaFromGzip(new File(filename));
226 }
227
228 public static Observable<Observable<String>> nmeasFromGzip(Observable<File> files) {
229 return files.map(f -> nmeaFromGzip(f.getPath()));
230 }
231
232 public static Observable<String> nmeaFromGzip(final File file) {
233
234 Func0<Reader> resourceFactory = () -> {
235 try {
236 return new InputStreamReader(new GZIPInputStream(new BufferedInputStream(new FileInputStream(file), 1024 * 1024)), UTF8);
237 } catch (IOException e) {
238 throw new RuntimeException(e);
239 }
240 };
241
242 Func1<Reader, Observable<String>> observableFactory = reader -> Strings
243 .split(Strings.from(reader), "\n");
244
245 Action1<Reader> disposeAction = reader -> {
246 try {
247 reader.close();
248 } catch (IOException e) {
249
250 }
251 };
252 return Observable.using(resourceFactory, observableFactory, disposeAction, true);
253 }
254
255 public static void print(Observable<?> stream, final PrintStream out) {
256 stream.subscribe(new Observer<Object>() {
257
258 @Override
259 public void onCompleted() {
260 }
261
262 @Override
263 public void onError(Throwable e) {
264 e.printStackTrace();
265 }
266
267 @Override
268 public void onNext(Object line) {
269 out.println(line);
270 }
271 });
272 }
273
274 public static void print(Observable<?> stream) {
275 print(stream, System.out);
276 }
277
278 public static final Func1<String, Optional<NmeaMessage>> LINE_TO_NMEA_MESSAGE = line -> {
279 try {
280 return Optional.of(NmeaUtil.parseNmea(line));
281 } catch (RuntimeException e) {
282 return Optional.absent();
283 }
284 };
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316 public static final Func1<String, Observable<LineAndTime>> toLineAndTime() {
317 return new Func1<String, Observable<LineAndTime>>() {
318
319 @Override
320 public Observable<LineAndTime> call(String line) {
321 try {
322 Long t = NmeaUtil.parseNmea(line).getUnixTimeMillis();
323 if (t == null)
324 return Observable.empty();
325 else
326 return Observable.just(new LineAndTime(line, t));
327 } catch (NmeaMessageParseException e) {
328 return Observable.empty();
329 } catch (RuntimeException e) {
330 return Observable.empty();
331 }
332
333 }
334 };
335 }
336
337 public static class TimestampedAndLines<T extends AisMessage> {
338 private final Optional<Timestamped<T>> message;
339 private final List<String> lines;
340 private final String error;
341
342 public TimestampedAndLines(Optional<Timestamped<T>> message, List<String> lines,
343 String error) {
344 this.message = message;
345 this.lines = lines;
346 this.error = error;
347 }
348
349 public Optional<Timestamped<T>> getMessage() {
350 return message;
351 }
352
353 public List<String> getLines() {
354 return lines;
355 }
356
357 public String getError() {
358 return error;
359 }
360
361 @Override
362 public String toString() {
363 StringBuilder builder = new StringBuilder();
364 if (message.isPresent())
365 builder.append("message=" + message);
366 else
367 builder.append("error=" + error);
368 builder.append(", lines=");
369 builder.append(lines);
370 return builder.toString();
371 }
372
373 }
374
375 public static class TimestampedAndLine<T extends AisMessage> {
376 private final Optional<Timestamped<T>> message;
377 private final String line;
378 private final String error;
379
380 public TimestampedAndLine(Optional<Timestamped<T>> message, String line, String error) {
381 this.message = message;
382 this.line = line;
383 this.error = error;
384 }
385
386 public Optional<Timestamped<T>> getMessage() {
387 return message;
388 }
389
390 public String getLine() {
391 return line;
392 }
393
394 public String getError() {
395 return error;
396 }
397
398 @Override
399 public String toString() {
400 StringBuilder builder = new StringBuilder();
401 if (message != null)
402 builder.append("message=" + message);
403 else
404 builder.append("error=" + error);
405 builder.append(", line=");
406 builder.append(line);
407 return builder.toString();
408 }
409
410 }
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440 public static final Func1<NmeaMessage, Optional<Timestamped<AisMessage>>> TO_AIS_MESSAGE = new Func1<NmeaMessage, Optional<Timestamped<AisMessage>>>() {
441
442 @Override
443 public Optional<Timestamped<AisMessage>> call(NmeaMessage nmea) {
444 try {
445 AisNmeaMessage n = new AisNmeaMessage(nmea);
446 Timestamped<AisMessage> m = n.getTimestampedMessage();
447
448
449
450
451
452
453
454
455
456 return Optional.of(m);
457 } catch (RuntimeException e) {
458 return Optional.absent();
459 }
460 }
461 };
462
463 private static boolean containsWeirdCharacters(String s) {
464 if (s == null)
465 return false;
466 else {
467 for (char ch : s.toCharArray()) {
468 if (ch < 32 && ch != 10 && ch != 13) {
469 log.warn("ch=" + (int) ch);
470 return true;
471 }
472 }
473 }
474 return false;
475 }
476
477 public static final Func1<NmeaMessage, TimestampedAndLine<AisMessage>> TO_AIS_MESSAGE_AND_LINE = nmea -> {
478 String line = nmea.toLine();
479 try {
480 AisNmeaMessage n = new AisNmeaMessage(nmea);
481 return new TimestampedAndLine<AisMessage>(
482 Optional.of(n.getTimestampedMessage(System.currentTimeMillis())), line, null);
483 } catch (AisParseException e) {
484 return new TimestampedAndLine<AisMessage>(Optional.<Timestamped<AisMessage>> absent(),
485 line, e.getMessage());
486 } catch (RuntimeException e) {
487 log.warn(e.getMessage(), e);
488 throw e;
489 }
490 };
491
492 public static final Func1<Optional<List<NmeaMessage>>, TimestampedAndLines<AisMessage>> TO_AIS_MESSAGE_AND_LINES = nmeas -> {
493 if (nmeas.isPresent()) {
494 List<String> lines = nmeas.get()
495 .stream()
496 .map(new Function<NmeaMessage, String>() {
497 @Override
498 public String apply(NmeaMessage t) {
499 return t.toLine();
500 }
501 }).collect(Collectors.toList());
502 Optional<NmeaMessage> concat = AisNmeaBuffer.concatenateMessages(nmeas.get());
503 if (concat.isPresent()) {
504 try {
505 AisNmeaMessage n = new AisNmeaMessage(concat.get());
506 return new TimestampedAndLines<AisMessage>(
507 Optional.of(n.getTimestampedMessage(System.currentTimeMillis())), lines,
508 null);
509 } catch (AisParseException e) {
510 return new TimestampedAndLines<AisMessage>(
511 Optional.<Timestamped<AisMessage>> absent(), lines, e.getMessage());
512 }
513 } else {
514 return new TimestampedAndLines<AisMessage>(
515 Optional.<Timestamped<AisMessage>> absent(), lines, "could not concat");
516 }
517 } else {
518 return new TimestampedAndLines<AisMessage>(Optional.absent(), Collections.emptyList(),
519 null);
520 }
521 };
522
523 public static final Transformer<NmeaMessage, Optional<List<NmeaMessage>>> addToBuffer(
524 int bufferSize) {
525 return new Transformer<NmeaMessage, Optional<List<NmeaMessage>>>() {
526
527 @Override
528 public Observable<Optional<List<NmeaMessage>>> call(Observable<NmeaMessage> o) {
529 return Observable.defer(() -> {
530 AisNmeaBuffer buffer = new AisNmeaBuffer(bufferSize);
531
532 return o.map(nmea -> buffer.add(nmea));
533 });
534 }
535
536 };
537 }
538
539 public static final Transformer<NmeaMessage, NmeaMessage> aggregateMultiLineNmea(
540 int bufferSize) {
541 return new Transformer<NmeaMessage, NmeaMessage>() {
542
543 @Override
544 public Observable<NmeaMessage> call(Observable<NmeaMessage> o) {
545 return Observable.defer(() -> {
546 AisNmeaBuffer buffer = new AisNmeaBuffer(bufferSize);
547
548 return o.flatMap(nmea -> {
549 return addToBuffer(buffer, nmea);
550 } , 1);
551 });
552 }
553
554 };
555 }
556
557 public static final Transformer<NmeaMessage, NmeaMessage> aggregateMultiLineNmeaWithLines(
558 int bufferSize) {
559 return new Transformer<NmeaMessage, NmeaMessage>() {
560
561 @Override
562 public Observable<NmeaMessage> call(Observable<NmeaMessage> o) {
563 return Observable.defer(() -> {
564 AisNmeaBuffer buffer = new AisNmeaBuffer(bufferSize);
565
566 return o.flatMap(nmea -> {
567 return addToBuffer(buffer, nmea);
568 } , 1);
569 });
570 }
571
572 };
573 }
574
575 private static Observable<? extends NmeaMessage> addToBuffer(AisNmeaBuffer buffer,
576 NmeaMessage nmea) {
577 try {
578 Optional<List<NmeaMessage>> list = buffer.add(nmea);
579 if (!list.isPresent())
580 return Observable.empty();
581 else {
582 Optional<NmeaMessage> concat = AisNmeaBuffer.concatenateMessages(list.get());
583 if (concat.isPresent())
584 return Observable.just(concat.get());
585 else
586 return Observable.empty();
587 }
588 } catch (RuntimeException e) {
589 log.warn(e.getMessage(), e);
590 return Observable.empty();
591 }
592 }
593
594
595
596 public static Observable<String> connectOnce(final HostPort hostPort) {
597
598 return Observable.unsafeCreate(new OnSubscribe<String>() {
599
600 private Socket socket = null;
601
602 private BufferedReader reader = null;
603
604 @Override
605 public void call(Subscriber<? super String> subscriber) {
606 try {
607 synchronized (this) {
608 log.info("creating new socket");
609 socket = createSocket(hostPort.getHost(), hostPort.getPort());
610 }
611 log.info("waiting one second before attempting connect");
612 Thread.sleep(1000);
613 InputStream is = socket.getInputStream();
614 reader = new BufferedReader(new InputStreamReader(is, UTF8));
615 subscriber.add(createSubscription());
616 while (!subscriber.isUnsubscribed()) {
617 String line;
618 try {
619 line = reader.readLine();
620 } catch (IOException e) {
621 if (subscriber.isUnsubscribed())
622
623
624 return;
625 else
626 throw e;
627 }
628 if (line != null)
629 subscriber.onNext(line);
630 else {
631
632
633 cancel();
634 subscriber.onCompleted();
635 }
636 }
637 } catch (Exception e) {
638 log.warn(e.getMessage(), e);
639 cancel();
640 subscriber.onError(e);
641 }
642 }
643
644 private Subscription createSubscription() {
645 return new Subscription() {
646
647 private final AtomicBoolean subscribed = new AtomicBoolean(true);
648
649 @Override
650 public boolean isUnsubscribed() {
651 return !subscribed.get();
652 }
653
654 @Override
655 public void unsubscribe() {
656 subscribed.set(false);
657 cancel();
658 }
659 };
660 }
661
662 public void cancel() {
663 log.info("cancelling socket read");
664
665
666
667 synchronized (this) {
668 if (socket != null) {
669 if (reader != null)
670 try {
671 reader.close();
672 } catch (IOException e) {
673
674 }
675 try {
676 socket.close();
677
678 socket = null;
679 } catch (IOException e) {
680
681 }
682 }
683 }
684 }
685 });
686 }
687
688 private static Socket createSocket(final String host, final int port) {
689 try {
690 return new Socket(host, port);
691 } catch (UnknownHostException e) {
692 throw new RuntimeException(e);
693 } catch (IOException e) {
694 throw new RuntimeException(e);
695 }
696 }
697
698 public static Func1<List<File>, Observable<Integer>> extractFixesFromNmeaGzAndAppendToFile(
699 final int linesPerProcessor, final Scheduler scheduler,
700 final Func1<Fix, String> fileMapper, final int writeBufferSize,
701 final Action1<File> logger) {
702 return files -> {
703 Observable<Fix> fixes = Streams
704 .extractFixes(
705 Observable.from(files)
706
707 .doOnNext(logger)
708
709 .concatMap(file -> Streams.nmeaFromGzip(file.getAbsolutePath())
710 .doOnError(e -> log.warn("problem reading file " + file
711 + ": " + e.getMessage()))
712 .onErrorResumeNext(Observable.empty())));
713 return BinaryFixesWriter
714 .writeFixes(fileMapper, fixes, writeBufferSize, false,
715 BinaryFixesFormat.WITHOUT_MMSI)
716
717 .reduce(0, countFixes())
718
719 .subscribeOn(scheduler);
720 };
721 }
722
723 private static Func2<Integer, List<Fix>, Integer> countFixes() {
724 return (count, fixes) -> count + fixes.size();
725 }
726
727 public static Observable<Integer> writeFixesFromNmeaGz(File input, Pattern inputPattern,
728 File output, int logEvery, int writeBufferSize, Scheduler scheduler,
729 int linesPerProcessor, long downSampleIntervalMs, Func1<Fix, String> fileMapper) {
730
731 final List<File> fileList = Files.find(input, inputPattern);
732 Observable<File> files = Observable.from(fileList);
733
734
735
736
737 Action1<File> logger = new Action1<File>() {
738 AtomicInteger count = new AtomicInteger();
739 Long start = null;
740
741 @Override
742 public void call(File file) {
743 if (start == null)
744 start = System.currentTimeMillis();
745 int num = count.incrementAndGet();
746 double filesPerSecond = (System.currentTimeMillis() - start) / (double) num
747 / 1000.0;
748 log.info("file " + num + " of " + fileList.size() + ", " + file.getName()
749 + ", rateFilesPerSecond=" + filesPerSecond);
750 }
751 };
752
753 deleteDirectory(output);
754
755 return files
756
757 .buffer(Math.max(fileList.size() / Runtime.getRuntime().availableProcessors(), 1))
758
759 .flatMap(extractFixesFromNmeaGzAndAppendToFile(linesPerProcessor, scheduler,
760 fileMapper, writeBufferSize, logger), 1)
761
762 .scan(0, (a, b) -> a + b)
763
764 .lift(Logging.<Integer> logger().showCount().showMemory()
765 .showRateSince("rate", 5000).every(logEvery).log())
766
767 .last()
768
769
770 .doOnCompleted(
771 () -> log.info("completed converting nmea to binary fixes, starting sort"))
772 .concatWith(BinaryFixes.sortBinaryFixFilesByTime(output, downSampleIntervalMs,
773 scheduler));
774 }
775
776 private static void deleteDirectory(File output) {
777 try {
778 FileUtils.deleteDirectory(output);
779 } catch (IOException e) {
780 throw new RuntimeException(e);
781 }
782 }
783
784 public static void main(String[] args) {
785 Streams.nmeaFromGzip(new File("/media/an/nmea/2015/NMEA_ITU_20150521.gz"))
786 .compose(o -> Streams.extract(o)).takeLast(10000).forEach(System.out::println);
787
788 }
789 }