View Javadoc
1   package au.gov.amsa.ais.rx;
2   
3   import static com.google.common.base.Optional.absent;
4   import static com.google.common.base.Optional.of;
5   
6   import java.io.BufferedInputStream;
7   import java.io.BufferedReader;
8   import java.io.File;
9   import java.io.FileInputStream;
10  import java.io.IOException;
11  import java.io.InputStream;
12  import java.io.InputStreamReader;
13  import java.io.PrintStream;
14  import java.io.Reader;
15  import java.net.Socket;
16  import java.net.UnknownHostException;
17  import java.nio.charset.Charset;
18  import java.util.Collections;
19  import java.util.List;
20  import java.util.concurrent.TimeUnit;
21  import java.util.concurrent.atomic.AtomicBoolean;
22  import java.util.concurrent.atomic.AtomicInteger;
23  import java.util.function.Function;
24  import java.util.regex.Pattern;
25  import java.util.stream.Collectors;
26  import java.util.zip.GZIPInputStream;
27  
28  import org.apache.commons.io.FileUtils;
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  
32  import com.github.davidmoten.rx.Checked;
33  import com.github.davidmoten.rx.slf4j.Logging;
34  import com.google.common.base.Optional;
35  
36  import au.gov.amsa.ais.AisMessage;
37  import au.gov.amsa.ais.AisNmeaBuffer;
38  import au.gov.amsa.ais.AisNmeaMessage;
39  import au.gov.amsa.ais.AisParseException;
40  import au.gov.amsa.ais.LineAndTime;
41  import au.gov.amsa.ais.Timestamped;
42  import au.gov.amsa.ais.message.AisPosition;
43  import au.gov.amsa.ais.message.AisPositionA;
44  import au.gov.amsa.risky.format.AisClass;
45  import au.gov.amsa.risky.format.BinaryFixes;
46  import au.gov.amsa.risky.format.BinaryFixesFormat;
47  import au.gov.amsa.risky.format.BinaryFixesWriter;
48  import au.gov.amsa.risky.format.Fix;
49  import au.gov.amsa.risky.format.FixImpl;
50  import au.gov.amsa.risky.format.NavigationalStatus;
51  import au.gov.amsa.streams.Strings;
52  import au.gov.amsa.util.Files;
53  import au.gov.amsa.util.nmea.NmeaMessage;
54  import au.gov.amsa.util.nmea.NmeaMessageParseException;
55  import au.gov.amsa.util.nmea.NmeaUtil;
56  import rx.Observable;
57  import rx.Observable.OnSubscribe;
58  import rx.Observable.Transformer;
59  import rx.Observer;
60  import rx.Scheduler;
61  import rx.Subscriber;
62  import rx.Subscription;
63  import rx.functions.Action1;
64  import rx.functions.Func0;
65  import rx.functions.Func1;
66  import rx.functions.Func2;
67  
68  public class Streams {
69  
70      public static final int BUFFER_SIZE = 100;
71      private static final Charset UTF8 = Charset.forName("UTF-8");
72  
73      private static Logger log = LoggerFactory.getLogger(Streams.class);
74  
75      public static Observable<String> connect(String host, int port) {
76          return connect(new HostPort(host, port));
77      }
78  
79      private static Observable<String> connect(HostPort socket) {
80          return connectOnce(socket).timeout(1, TimeUnit.MINUTES).retry();
81      }
82  
83      public static Observable<TimestampedAndLine<AisMessage>> connectAndExtract(String host,
84              int port) {
85          return extract(connect(host, port));
86      }
87  
88      public static Observable<TimestampedAndLine<AisMessage>> extract(
89              Observable<String> rawAisNmea) {
90          return rawAisNmea
91                  // parse nmea
92                  .map(LINE_TO_NMEA_MESSAGE)
93                  // if error filter out
94                  .compose(Streams.<NmeaMessage> valueIfPresent())
95                  // aggregate multi line nmea
96                  .compose(aggregateMultiLineNmea(BUFFER_SIZE))
97                  // parse ais message and include line
98                  .map(TO_AIS_MESSAGE_AND_LINE);
99      }
100 
101     public static Observable<TimestampedAndLines<AisMessage>> extractWithLines(
102             Observable<String> rawAisNmea) {
103         return rawAisNmea
104                 // parse nmea
105                 .map(LINE_TO_NMEA_MESSAGE)
106                 // if error filter out
107                 .compose(Streams.<NmeaMessage> valueIfPresent())
108                 // aggregate multi line nmea
109                 .compose(addToBuffer(BUFFER_SIZE))
110                 // parse ais message and include line
111                 .map(TO_AIS_MESSAGE_AND_LINES);
112     }
113 
114     public static Observable<Timestamped<AisMessage>> extractMessages(
115             Observable<String> rawAisNmea) {
116         return rawAisNmea.map(LINE_TO_NMEA_MESSAGE)
117                 //
118                 .compose(Streams.<NmeaMessage> valueIfPresent())
119                 //
120                 .compose(aggregateMultiLineNmea(BUFFER_SIZE))
121                 //
122                 .map(TO_AIS_MESSAGE)
123                 //
124                 .compose(Streams.<Timestamped<AisMessage>> valueIfPresent());
125     }
126 
127     public static <T> Func1<Optional<T>, Boolean> isPresent() {
128         return x -> x.isPresent();
129     }
130 
131     public static <T> Func1<Optional<T>, T> toValue() {
132         return x -> x.get();
133 
134     }
135 
136     public static <T> Transformer<Optional<T>, T> valueIfPresent() {
137         return o -> o.filter(Streams.<T> isPresent()).map(Streams.<T> toValue());
138     }
139 
140     public static Observable<Fix> extractFixes(Observable<String> rawAisNmea) {
141         return extractMessages(rawAisNmea).flatMap(TO_FIX, 1);
142     }
143 
144     private static final Func1<Timestamped<AisMessage>, Observable<Fix>> TO_FIX = m -> {
145         try {
146             if (m.message() instanceof AisPosition) {
147                 AisPosition a = (AisPosition) m.message();
148                 if (a.getLatitude() == null || a.getLongitude() == null || a.getLatitude() < -90
149                         || a.getLatitude() > 90 || a.getLongitude() < -180
150                         || a.getLongitude() > 180)
151                     return Observable.empty();
152                 else {
153                     Optional<NavigationalStatus> nav;
154                     if (a instanceof AisPositionA) {
155                         AisPositionA p = (AisPositionA) a;
156                         nav = of(NavigationalStatus.values()[p.getNavigationalStatus().ordinal()]);
157                     } else
158                         nav = absent();
159 
160                     Optional<Float> sog;
161                     if (a.getSpeedOverGroundKnots() == null)
162                         sog = absent();
163                     else
164                         sog = of((a.getSpeedOverGroundKnots().floatValue()));
165                     Optional<Float> cog;
166                     if (a.getCourseOverGround() == null || a.getCourseOverGround() >= 360
167                             || a.getCourseOverGround() < 0)
168                         cog = absent();
169                     else
170                         cog = of((a.getCourseOverGround().floatValue()));
171                     Optional<Float> heading;
172                     if (a.getTrueHeading() == null || a.getTrueHeading() >= 360
173                             || a.getTrueHeading() < 0)
174                         heading = absent();
175                     else
176                         heading = of((a.getTrueHeading().floatValue()));
177 
178                     AisClass aisClass;
179                     if (a instanceof AisPositionA)
180                         aisClass = AisClass.A;
181                     else
182                         aisClass = AisClass.B;
183                     Optional<Short> src;
184                     if (a.getSource() != null) {
185                         // TODO decode
186                         src = of((short) BinaryFixes.SOURCE_PRESENT_BUT_UNKNOWN);
187                     } else
188                         src = absent();
189 
190                     // TODO latency
191                     Optional<Integer> latency = absent();
192 
193                     Fix f = new FixImpl(a.getMmsi(), a.getLatitude().floatValue(),
194                             a.getLongitude().floatValue(), m.time(), latency, src, nav, sog, cog,
195                             heading, aisClass);
196                     return Observable.just(f);
197                 }
198             } else
199                 return Observable.empty();
200         } catch (RuntimeException e) {
201             log.warn(e.getMessage(), e);
202             return Observable.empty();
203         }
204     };
205 
206     public static Observable<String> nmeaFrom(final File file) {
207         return Observable.using(
208                 //
209                 Checked.f0(() -> new FileInputStream(file)), is -> nmeaFrom(is),
210                 //
211                 is -> {
212                     try {
213                         is.close();
214                     } catch (IOException e) {
215                         // don't care
216                     }
217                 } , true);
218     }
219 
220     public static Observable<String> nmeaFrom(InputStream is) {
221         return Strings.split(Strings.from(new InputStreamReader(is, UTF8)), "\n");
222     }
223 
224     public static Observable<String> nmeaFromGzip(String filename) {
225         return nmeaFromGzip(new File(filename));
226     }
227 
228     public static Observable<Observable<String>> nmeasFromGzip(Observable<File> files) {
229         return files.map(f -> nmeaFromGzip(f.getPath()));
230     }
231 
232     public static Observable<String> nmeaFromGzip(final File file) {
233 
234         Func0<Reader> resourceFactory = () -> {
235             try {
236                 return new InputStreamReader(new GZIPInputStream(new BufferedInputStream(new FileInputStream(file), 1024 * 1024)), UTF8);
237             } catch (IOException e) {
238                 throw new RuntimeException(e);
239             }
240         };
241 
242         Func1<Reader, Observable<String>> observableFactory = reader -> Strings
243                 .split(Strings.from(reader), "\n");
244 
245         Action1<Reader> disposeAction = reader -> {
246             try {
247                 reader.close();
248             } catch (IOException e) {
249                 // ignore
250             }
251         };
252         return Observable.using(resourceFactory, observableFactory, disposeAction, true);
253     }
254 
255     public static void print(Observable<?> stream, final PrintStream out) {
256         stream.subscribe(new Observer<Object>() {
257 
258             @Override
259             public void onCompleted() {
260             }
261 
262             @Override
263             public void onError(Throwable e) {
264                 e.printStackTrace();
265             }
266 
267             @Override
268             public void onNext(Object line) {
269                 out.println(line);
270             }
271         });
272     }
273 
274     public static void print(Observable<?> stream) {
275         print(stream, System.out);
276     }
277 
278     public static final Func1<String, Optional<NmeaMessage>> LINE_TO_NMEA_MESSAGE = line -> {
279         try {
280             return Optional.of(NmeaUtil.parseNmea(line));
281         } catch (RuntimeException e) {
282             return Optional.absent();
283         }
284     };
285 
286     // public static final Func1<String, Observable<NmeaMessage>>
287     // toNmeaMessage() {
288     // return toNmeaMessage(false);
289     // }
290     //
291     // public static final Func1<String, Observable<NmeaMessage>> toNmeaMessage(
292     // final boolean logWarnings) {
293     // return new Func1<String, Observable<NmeaMessage>>() {
294     //
295     // @Override
296     // public Observable<NmeaMessage> call(String line) {
297     // try {
298     // return Observable.just(NmeaUtil.parseNmea(line));
299     // } catch (NmeaMessageParseException e) {
300     // if (logWarnings) {
301     // log.warn(e.getMessage());
302     // log.warn("LINE=" + line);
303     // }
304     // return Observable.empty();
305     // } catch (RuntimeException e) {
306     // if (logWarnings) {
307     // log.warn(e.getMessage());
308     // log.warn("LINE=" + line);
309     // }
310     // return Observable.empty();
311     // }
312     // }
313     // };
314     // }
315 
316     public static final Func1<String, Observable<LineAndTime>> toLineAndTime() {
317         return new Func1<String, Observable<LineAndTime>>() {
318 
319             @Override
320             public Observable<LineAndTime> call(String line) {
321                 try {
322                     Long t = NmeaUtil.parseNmea(line).getUnixTimeMillis();
323                     if (t == null)
324                         return Observable.empty();
325                     else
326                         return Observable.just(new LineAndTime(line, t));
327                 } catch (NmeaMessageParseException e) {
328                     return Observable.empty();
329                 } catch (RuntimeException e) {
330                     return Observable.empty();
331                 }
332 
333             }
334         };
335     }
336 
337     public static class TimestampedAndLines<T extends AisMessage> {
338         private final Optional<Timestamped<T>> message;
339         private final List<String> lines;
340         private final String error;
341 
342         public TimestampedAndLines(Optional<Timestamped<T>> message, List<String> lines,
343                 String error) {
344             this.message = message;
345             this.lines = lines;
346             this.error = error;
347         }
348 
349         public Optional<Timestamped<T>> getMessage() {
350             return message;
351         }
352 
353         public List<String> getLines() {
354             return lines;
355         }
356 
357         public String getError() {
358             return error;
359         }
360 
361         @Override
362         public String toString() {
363             StringBuilder builder = new StringBuilder();
364             if (message.isPresent())
365                 builder.append("message=" + message);
366             else
367                 builder.append("error=" + error);
368             builder.append(", lines=");
369             builder.append(lines);
370             return builder.toString();
371         }
372 
373     }
374 
375     public static class TimestampedAndLine<T extends AisMessage> {
376         private final Optional<Timestamped<T>> message;
377         private final String line;
378         private final String error;
379 
380         public TimestampedAndLine(Optional<Timestamped<T>> message, String line, String error) {
381             this.message = message;
382             this.line = line;
383             this.error = error;
384         }
385 
386         public Optional<Timestamped<T>> getMessage() {
387             return message;
388         }
389 
390         public String getLine() {
391             return line;
392         }
393 
394         public String getError() {
395             return error;
396         }
397 
398         @Override
399         public String toString() {
400             StringBuilder builder = new StringBuilder();
401             if (message != null)
402                 builder.append("message=" + message);
403             else
404                 builder.append("error=" + error);
405             builder.append(", line=");
406             builder.append(line);
407             return builder.toString();
408         }
409 
410     }
411 
412     // public static final Func1<NmeaMessage,
413     // Observable<Timestamped<AisMessage>>> toAisMessage(
414     // final boolean logWarnings) {
415     // return new Func1<NmeaMessage, Observable<Timestamped<AisMessage>>>() {
416     //
417     // @Override
418     // public Observable<Timestamped<AisMessage>> call(NmeaMessage nmea) {
419     // try {
420     // AisNmeaMessage n = new AisNmeaMessage(nmea);
421     // Timestamped<AisMessage> m = n.getTimestampedMessage();
422     // if (m.message() instanceof AisShipStaticA) {
423     // AisShipStaticA s = (AisShipStaticA) m.message();
424     // if (logWarnings
425     // && containsWeirdCharacters(s.getDestination())) {
426     // log.warn("weird destination '" + s.getDestination()
427     // + "'");
428     // log.warn("line=" + n.getNmea().toLine());
429     // }
430     // }
431     // return Observable.just(m);
432     // } catch (AisParseException e) {
433     // return Observable.empty();
434     // }
435     // }
436     //
437     // };
438     // }
439 
440     public static final Func1<NmeaMessage, Optional<Timestamped<AisMessage>>> TO_AIS_MESSAGE = new Func1<NmeaMessage, Optional<Timestamped<AisMessage>>>() {
441 
442         @Override
443         public Optional<Timestamped<AisMessage>> call(NmeaMessage nmea) {
444             try {
445                 AisNmeaMessage n = new AisNmeaMessage(nmea);
446                 Timestamped<AisMessage> m = n.getTimestampedMessage();
447                 // if (m.message() instanceof AisShipStaticA) {
448                 // AisShipStaticA s = (AisShipStaticA) m.message();
449                 // if (logWarnings
450                 // && containsWeirdCharacters(s.getDestination())) {
451                 // log.warn("weird destination '" + s.getDestination()
452                 // + "'");
453                 // log.warn("line=" + n.getNmea().toLine());
454                 // }
455                 // }
456                 return Optional.of(m);
457             } catch (RuntimeException e) {
458                 return Optional.absent();
459             }
460         }
461     };
462 
463     private static boolean containsWeirdCharacters(String s) {
464         if (s == null)
465             return false;
466         else {
467             for (char ch : s.toCharArray()) {
468                 if (ch < 32 && ch != 10 && ch != 13) {
469                     log.warn("ch=" + (int) ch);
470                     return true;
471                 }
472             }
473         }
474         return false;
475     }
476 
477     public static final Func1<NmeaMessage, TimestampedAndLine<AisMessage>> TO_AIS_MESSAGE_AND_LINE = nmea -> {
478         String line = nmea.toLine();
479         try {
480             AisNmeaMessage n = new AisNmeaMessage(nmea);
481             return new TimestampedAndLine<AisMessage>(
482                     Optional.of(n.getTimestampedMessage(System.currentTimeMillis())), line, null);
483         } catch (AisParseException e) {
484             return new TimestampedAndLine<AisMessage>(Optional.<Timestamped<AisMessage>> absent(),
485                     line, e.getMessage());
486         } catch (RuntimeException e) {
487             log.warn(e.getMessage(), e);
488             throw e;
489         }
490     };
491 
492     public static final Func1<Optional<List<NmeaMessage>>, TimestampedAndLines<AisMessage>> TO_AIS_MESSAGE_AND_LINES = nmeas -> {
493         if (nmeas.isPresent()) {
494             List<String> lines = nmeas.get() //
495                     .stream() //
496                     .map(new Function<NmeaMessage, String>() {
497                 @Override
498                 public String apply(NmeaMessage t) {
499                     return t.toLine();
500                 }
501             }).collect(Collectors.toList());
502             Optional<NmeaMessage> concat = AisNmeaBuffer.concatenateMessages(nmeas.get());
503             if (concat.isPresent()) {
504                 try {
505                     AisNmeaMessage n = new AisNmeaMessage(concat.get());
506                     return new TimestampedAndLines<AisMessage>(
507                             Optional.of(n.getTimestampedMessage(System.currentTimeMillis())), lines,
508                             null);
509                 } catch (AisParseException e) {
510                     return new TimestampedAndLines<AisMessage>(
511                             Optional.<Timestamped<AisMessage>> absent(), lines, e.getMessage());
512                 }
513             } else {
514                 return new TimestampedAndLines<AisMessage>(
515                         Optional.<Timestamped<AisMessage>> absent(), lines, "could not concat");
516             }
517         } else {
518             return new TimestampedAndLines<AisMessage>(Optional.absent(), Collections.emptyList(),
519                     null);
520         }
521     };
522 
523     public static final Transformer<NmeaMessage, Optional<List<NmeaMessage>>> addToBuffer(
524             int bufferSize) {
525         return new Transformer<NmeaMessage, Optional<List<NmeaMessage>>>() {
526 
527             @Override
528             public Observable<Optional<List<NmeaMessage>>> call(Observable<NmeaMessage> o) {
529                 return Observable.defer(() -> {
530                     AisNmeaBuffer buffer = new AisNmeaBuffer(bufferSize);
531                     // use maxConcurrent so doesn't request unbounded
532                     return o.map(nmea -> buffer.add(nmea));
533                 });
534             }
535 
536         };
537     }
538 
539     public static final Transformer<NmeaMessage, NmeaMessage> aggregateMultiLineNmea(
540             int bufferSize) {
541         return new Transformer<NmeaMessage, NmeaMessage>() {
542 
543             @Override
544             public Observable<NmeaMessage> call(Observable<NmeaMessage> o) {
545                 return Observable.defer(() -> {
546                     AisNmeaBuffer buffer = new AisNmeaBuffer(bufferSize);
547                     // use maxConcurrent so doesn't request unbounded
548                     return o.flatMap(nmea -> {
549                         return addToBuffer(buffer, nmea);
550                     } , 1);
551                 });
552             }
553 
554         };
555     }
556 
557     public static final Transformer<NmeaMessage, NmeaMessage> aggregateMultiLineNmeaWithLines(
558             int bufferSize) {
559         return new Transformer<NmeaMessage, NmeaMessage>() {
560 
561             @Override
562             public Observable<NmeaMessage> call(Observable<NmeaMessage> o) {
563                 return Observable.defer(() -> {
564                     AisNmeaBuffer buffer = new AisNmeaBuffer(bufferSize);
565                     // use maxConcurrent so doesn't request unbounded
566                     return o.flatMap(nmea -> {
567                         return addToBuffer(buffer, nmea);
568                     } , 1);
569                 });
570             }
571 
572         };
573     }
574 
575     private static Observable<? extends NmeaMessage> addToBuffer(AisNmeaBuffer buffer,
576             NmeaMessage nmea) {
577         try {
578             Optional<List<NmeaMessage>> list = buffer.add(nmea);
579             if (!list.isPresent())
580                 return Observable.empty();
581             else {
582                 Optional<NmeaMessage> concat = AisNmeaBuffer.concatenateMessages(list.get());
583                 if (concat.isPresent())
584                     return Observable.just(concat.get());
585                 else
586                     return Observable.empty();
587             }
588         } catch (RuntimeException e) {
589             log.warn(e.getMessage(), e);
590             return Observable.empty();
591         }
592     }
593 
594     // private static Charset US_ASCII = Charset.forName("US-ASCII");
595 
596     public static Observable<String> connectOnce(final HostPort hostPort) {
597 
598         return Observable.unsafeCreate(new OnSubscribe<String>() {
599 
600             private Socket socket = null;
601 
602             private BufferedReader reader = null;
603 
604             @Override
605             public void call(Subscriber<? super String> subscriber) {
606                 try {
607                     synchronized (this) {
608                         log.info("creating new socket");
609                         socket = createSocket(hostPort.getHost(), hostPort.getPort());
610                     }
611                     log.info("waiting one second before attempting connect");
612                     Thread.sleep(1000);
613                     InputStream is = socket.getInputStream();
614                     reader = new BufferedReader(new InputStreamReader(is, UTF8));
615                     subscriber.add(createSubscription());
616                     while (!subscriber.isUnsubscribed()) {
617                         String line;
618                         try {
619                             line = reader.readLine();
620                         } catch (IOException e) {
621                             if (subscriber.isUnsubscribed())
622                                 // most likely socket closed as a result of
623                                 // unsubscribe so don't report as onError
624                                 return;
625                             else
626                                 throw e;
627                         }
628                         if (line != null)
629                             subscriber.onNext(line);
630                         else {
631                             // close stuff eagerly rather than waiting for
632                             // unsubscribe following onComplete
633                             cancel();
634                             subscriber.onCompleted();
635                         }
636                     }
637                 } catch (Exception e) {
638                     log.warn(e.getMessage(), e);
639                     cancel();
640                     subscriber.onError(e);
641                 }
642             }
643 
644             private Subscription createSubscription() {
645                 return new Subscription() {
646 
647                     private final AtomicBoolean subscribed = new AtomicBoolean(true);
648 
649                     @Override
650                     public boolean isUnsubscribed() {
651                         return !subscribed.get();
652                     }
653 
654                     @Override
655                     public void unsubscribe() {
656                         subscribed.set(false);
657                         cancel();
658                     }
659                 };
660             }
661 
662             public void cancel() {
663                 log.info("cancelling socket read");
664                 // only allow socket to be closed once because a fresh
665                 // instance of Socket could have been opened to the same
666                 // host and port and we don't want to mess with it.
667                 synchronized (this) {
668                     if (socket != null) {
669                         if (reader != null)
670                             try {
671                                 reader.close();
672                             } catch (IOException e) {
673                                 // ignore
674                             }
675                         try {
676                             socket.close();
677                             // release memory (not much)
678                             socket = null;
679                         } catch (IOException e) {
680                             // ignore
681                         }
682                     }
683                 }
684             }
685         });
686     }
687 
688     private static Socket createSocket(final String host, final int port) {
689         try {
690             return new Socket(host, port);
691         } catch (UnknownHostException e) {
692             throw new RuntimeException(e);
693         } catch (IOException e) {
694             throw new RuntimeException(e);
695         }
696     }
697 
698     public static Func1<List<File>, Observable<Integer>> extractFixesFromNmeaGzAndAppendToFile(
699             final int linesPerProcessor, final Scheduler scheduler,
700             final Func1<Fix, String> fileMapper, final int writeBufferSize,
701             final Action1<File> logger) {
702         return files -> {
703             Observable<Fix> fixes = Streams
704                     .extractFixes(
705                             Observable.from(files)
706                                     // log
707                                     .doOnNext(logger)
708                                     // one file at a time
709                                     .concatMap(file -> Streams.nmeaFromGzip(file.getAbsolutePath()) //
710                                             .doOnError(e -> log.warn("problem reading file " + file
711                                                     + ": " + e.getMessage())) //
712                             .onErrorResumeNext(Observable.empty())));
713             return BinaryFixesWriter
714                     .writeFixes(fileMapper, fixes, writeBufferSize, false,
715                             BinaryFixesFormat.WITHOUT_MMSI)
716                     // total counts
717                     .reduce(0, countFixes())
718                     // do async
719                     .subscribeOn(scheduler);
720         };
721     }
722 
723     private static Func2<Integer, List<Fix>, Integer> countFixes() {
724         return (count, fixes) -> count + fixes.size();
725     }
726 
727     public static Observable<Integer> writeFixesFromNmeaGz(File input, Pattern inputPattern,
728             File output, int logEvery, int writeBufferSize, Scheduler scheduler,
729             int linesPerProcessor, long downSampleIntervalMs, Func1<Fix, String> fileMapper) {
730 
731         final List<File> fileList = Files.find(input, inputPattern);
732         Observable<File> files = Observable.from(fileList);
733 
734         // count files across parallel streams
735         // OperatorLogging<File> logger = Logging.<File> logger().showCount()
736         // .showRateSinceStart("rateFilesPerSecond").showValue().log();
737         Action1<File> logger = new Action1<File>() {
738             AtomicInteger count = new AtomicInteger();
739             Long start = null;
740 
741             @Override
742             public void call(File file) {
743                 if (start == null)
744                     start = System.currentTimeMillis();
745                 int num = count.incrementAndGet();
746                 double filesPerSecond = (System.currentTimeMillis() - start) / (double) num
747                         / 1000.0;
748                 log.info("file " + num + " of " + fileList.size() + ", " + file.getName()
749                         + ", rateFilesPerSecond=" + filesPerSecond);
750             }
751         };
752 
753         deleteDirectory(output);
754 
755         return files
756                 // log the filename
757                 .buffer(Math.max(fileList.size() / Runtime.getRuntime().availableProcessors(), 1))
758                 // extract fixes
759                 .flatMap(extractFixesFromNmeaGzAndAppendToFile(linesPerProcessor, scheduler,
760                         fileMapper, writeBufferSize, logger), 1)
761                 // count number written fixes
762                 .scan(0, (a, b) -> a + b)
763                 // log
764                 .lift(Logging.<Integer> logger().showCount().showMemory()
765                         .showRateSince("rate", 5000).every(logEvery).log())
766                 // get the final count
767                 .last()
768                 // on completion of writing fixes, sort the track files and emit
769                 // the count of files
770                 .doOnCompleted(
771                         () -> log.info("completed converting nmea to binary fixes, starting sort"))
772                 .concatWith(BinaryFixes.sortBinaryFixFilesByTime(output, downSampleIntervalMs,
773                         scheduler));
774     }
775 
776     private static void deleteDirectory(File output) {
777         try {
778             FileUtils.deleteDirectory(output);
779         } catch (IOException e) {
780             throw new RuntimeException(e);
781         }
782     }
783 
784     public static void main(String[] args) {
785         Streams.nmeaFromGzip(new File("/media/an/nmea/2015/NMEA_ITU_20150521.gz"))
786                 .compose(o -> Streams.extract(o)).takeLast(10000).forEach(System.out::println);
787 
788     }
789 }