View Javadoc
1   package au.gov.amsa.navigation;
2   
3   import java.io.File;
4   import java.io.PrintStream;
5   import java.text.SimpleDateFormat;
6   import java.util.Date;
7   import java.util.Iterator;
8   import java.util.List;
9   import java.util.TimeZone;
10  import java.util.TreeSet;
11  
12  import com.github.davidmoten.guavamini.Preconditions;
13  import com.github.davidmoten.rx.Checked;
14  import com.github.davidmoten.rx.Transformers;
15  import com.github.davidmoten.rx.slf4j.Logging;
16  import com.google.common.base.Optional;
17  
18  import au.gov.amsa.ais.AisMessage;
19  import au.gov.amsa.ais.Timestamped;
20  import au.gov.amsa.ais.message.AisShipStatic;
21  import au.gov.amsa.ais.message.AisShipStaticA;
22  import au.gov.amsa.ais.message.AisShipStaticUtil;
23  import au.gov.amsa.ais.rx.Streams;
24  import au.gov.amsa.ais.rx.Streams.TimestampedAndLine;
25  import rx.Observable;
26  import rx.Scheduler;
27  import rx.functions.Action1;
28  import rx.functions.Func0;
29  import rx.functions.Func1;
30  import rx.observables.GroupedObservable;
31  import rx.schedulers.Schedulers;
32  
33  public final class ShipStaticDataCreator {
34  
35      public static Observable<AisShipStatic> writeStaticDataToFile(List<File> files,
36              File outputFile) {
37  
38          return writeStaticDataToFile(files, outputFile, Schedulers.computation());
39      }
40  
41      public static Observable<AisShipStatic> writeStaticDataToFile(List<File> files, File outputFile,
42              Scheduler scheduler) {
43          Func0<PrintStream> resourceFactory = Checked.f0(() -> new PrintStream(outputFile));
44          Func1<PrintStream, Observable<AisShipStatic>> observableFactory = out -> Observable
45                  .from(files)
46                  // buffer into chunks for each processor
47                  .buffer(Math.max(1, files.size() / Runtime.getRuntime().availableProcessors() - 1))
48                  .flatMap(
49                          list -> Observable.from(list) //
50                                  .lift(Logging.<File> logger().showValue().showMemory().log()) //
51                                  .concatMap(
52                                          file -> Streams.extract(Streams.nmeaFromGzip(file)) //
53                                                  .flatMap(aisShipStaticOnly) //
54                                                  .map(m -> m.getMessage().get().message()) //
55                                                  .distinct(m -> m.getMmsi()) //
56                                                  .doOnError(e -> System.err.println("could not read "
57                                                          + file + ": " + e.getMessage())) //
58                                  .onErrorResumeNext(Observable.<AisShipStatic> empty())) //
59                          .distinct(m -> m.getMmsi()) //
60                          .subscribeOn(scheduler)) //
61                  .distinct(m -> m.getMmsi()) //
62                  .compose(Transformers.mapWithIndex()) //
63                  .doOnNext(indexed -> {
64                      if (indexed.index() == 0) {
65                          out.println(
66                                  "# MMSI, IMO, AisClass, AisShipType, MaxPresentStaticDraughtMetres, DimAMetres, DimBMetres, DimCMetres, DimDMetres, LengthMetres, WidthMetres, Name");
67                          out.println("# columns are tab delimited");
68                          out.println("# -1 = not present");
69                      }
70                  })
71                  //
72                  .map(indexed -> indexed.value())
73                  //
74                  .doOnNext(m -> {
75                      out.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", m.getMmsi(),
76                              getImo(m).or(-1), m instanceof AisShipStaticA ? "A" : "B",
77                              m.getShipType(), getMaximumPresentStaticDraughtMetres(m).or(-1F),
78                              m.getDimensionA().or(-1), m.getDimensionB().or(-1),
79                              m.getDimensionC().or(-1), m.getDimensionD().or(-1),
80                              AisShipStaticUtil.lengthMetres(m).or(-1),
81                              AisShipStaticUtil.widthMetres(m).or(-1), prepareName(m.getName()));
82                      out.flush();
83                  });
84  
85          Action1<PrintStream> disposeAction = out -> out.close();
86          return Observable.using(resourceFactory, observableFactory, disposeAction);
87      }
88  
89      public static Observable<Timestamped<AisShipStatic>> writeStaticDataToFileWithTimestamps(
90              List<File> files, File outputFile, Scheduler scheduler) {
91          SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
92          sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
93          Func0<PrintStream> resourceFactory = Checked.f0(() -> new PrintStream(outputFile));
94          Func1<PrintStream, Observable<Timestamped<AisShipStatic>>> observableFactory = out -> Observable
95                  .from(files)
96                  // buffer into chunks for each processor
97                  .buffer(Math.max(1, files.size() / Runtime.getRuntime().availableProcessors() - 1))
98                  .flatMap(list -> Observable.from(list) //
99                          .lift(Logging.<File> logger().showValue().showMemory().log()) //
100                         .concatMap(file -> timestampedShipStatics(file))) //
101                 .groupBy(m -> m.message().getMmsi()) //
102                 .flatMap(g -> collect(g).subscribeOn(scheduler)) //
103                 .compose(Transformers.doOnFirst(x -> {
104                     out.println(
105                             "# MMSI, Time, IMO, AisClass, AisShipType, MaxPresentStaticDraughtMetres, DimAMetres, DimBMetres, DimCMetres, DimDMetres, LengthMetres, WidthMetres, Name");
106                     out.println("# columns are tab delimited");
107                     out.println("# -1 = not present");
108                 })) //
109                 .filter(set -> set.size() <= 10) //
110                 .flatMapIterable(set -> set) //
111                 .doOnNext(k -> {
112                     AisShipStatic m = k.message();
113                     out.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", m.getMmsi(),
114                             sdf.format(new Date(k.time())), getImo(m).or(-1),
115                             m instanceof AisShipStaticA ? "A" : "B", m.getShipType(),
116                             getMaximumPresentStaticDraughtMetres(m).or(-1F),
117                             m.getDimensionA().or(-1), m.getDimensionB().or(-1),
118                             m.getDimensionC().or(-1), m.getDimensionD().or(-1),
119                             AisShipStaticUtil.lengthMetres(m).or(-1),
120                             AisShipStaticUtil.widthMetres(m).or(-1), prepareName(m.getName()));
121                     out.flush();
122                 });
123         Action1<PrintStream> disposeAction = out -> out.close();
124         return Observable.using(resourceFactory, observableFactory, disposeAction);
125     }
126 
127     private static Observable<Timestamped<AisShipStatic>> timestampedShipStatics(File file) {
128         return Streams.extract(Streams.nmeaFromGzip(file)) //
129                 .flatMap(aisShipStaticOnly) //
130                 .doOnError(
131                         e -> System.err.println("could not read " + file + ": " + e.getMessage()))
132                 .onErrorResumeNext(Observable.<TimestampedAndLine<AisShipStatic>> empty())
133                 .filter(x -> x.getMessage().isPresent()) //
134                 .map(x -> x.getMessage().get());
135     }
136 
137     private static Observable<TreeSet<Timestamped<AisShipStatic>>> collect(
138             GroupedObservable<Integer, Timestamped<AisShipStatic>> g) {
139         return g.collect(() -> new TreeSet<Timestamped<AisShipStatic>>(
140                 (a, b) -> Long.compare(a.time(), b.time())), (set, x) -> {
141                     Timestamped<AisShipStatic> a = set.floor(x);
142                     Timestamped<AisShipStatic> b = set.ceiling(x);
143                     if (a != null && a.time() == x.time()) {
144                         return;
145                     }
146                     if (b != null && b.time() == x.time()) {
147                         return;
148                     }
149 
150                     // There is a hole in this code for when a == b and a != x
151                     // this seems unlikely and is presumably a correction
152                     // of a bad record so happy to miss this case.
153                     if (a == null) {
154                         // nothing before
155                         if (b == null) {
156                             // nothing after
157                             set.add(x);
158                         } else if (isDifferent(b.message(), x.message())) {
159                             set.add(x);
160                         }
161                         // otherwise ignore
162                     } else {
163                         boolean axDifferent = isDifferent(a.message(), x.message());
164                         if (b == null) {
165                             // nothing after
166                             if (axDifferent) {
167                                 set.add(x);
168                             }
169                         } else {
170                             boolean bxDifferent = isDifferent(x.message(), b.message());
171                             if (axDifferent) {
172                                 set.add(x);
173                                 if (!bxDifferent) {
174                                     remove(set, b);
175                                 }
176                             }
177                         }
178                     }
179                 });
180     }
181 
182     private static void remove(TreeSet<Timestamped<AisShipStatic>> set,
183             Timestamped<AisShipStatic> a) {
184         // slow O(n) remove
185         Iterator<Timestamped<AisShipStatic>> it = set.iterator();
186         while (it.hasNext()) {
187             if (it.next() == a) {
188                 it.remove();
189             }
190         }
191     }
192 
193     private static final boolean justImo = true;
194 
195     private static boolean isDifferent(AisShipStatic a, AisShipStatic b) {
196         Preconditions.checkArgument(a.getMmsi() == b.getMmsi());
197 
198         boolean different = !justImo //
199                 && (!a.getDimensionA().equals(b.getDimensionA())
200                         || !a.getDimensionB().equals(b.getDimensionB())
201                         || !a.getDimensionC().equals(b.getDimensionC())
202                         || !a.getDimensionD().equals(b.getDimensionD())
203                         || !a.getLengthMetres().equals(b.getLengthMetres())
204                         || !a.getWidthMetres().equals(b.getWidthMetres())
205                         || !a.getName().equals(b.getName()) //
206                         || a.getShipType() != b.getShipType());
207         if (different) {
208             return true;
209         } else if (a instanceof AisShipStaticA && b instanceof AisShipStaticA) {
210             AisShipStaticA a2 = (AisShipStaticA) a;
211             AisShipStaticA b2 = (AisShipStaticA) b;
212             return !a2.getImo().equals(b2.getImo());
213         } else {
214             return false;
215         }
216     }
217 
218     private static String prepareName(String name) {
219         if (name == null)
220             return "";
221         else
222             return name.replace("\t", " ").trim();
223     }
224 
225     private static Optional<Integer> getImo(AisShipStatic m) {
226         if (m instanceof AisShipStaticA) {
227             return ((AisShipStaticA) m).getImo();
228         } else
229             return Optional.absent();
230     }
231 
232     private static Optional<Float> getMaximumPresentStaticDraughtMetres(AisShipStatic m) {
233         if (m instanceof AisShipStaticA) {
234             return Optional.of((float) ((AisShipStaticA) m).getMaximumPresentStaticDraughtMetres());
235         } else
236             return Optional.absent();
237     }
238 
239     private static Func1<TimestampedAndLine<AisMessage>, Observable<TimestampedAndLine<AisShipStatic>>> aisShipStaticOnly = m -> {
240         Optional<Timestamped<AisMessage>> message = m.getMessage();
241         if (message.isPresent() && message.get().message() instanceof AisShipStatic) {
242             @SuppressWarnings("unchecked")
243             TimestampedAndLine<AisShipStatic> m2 = (TimestampedAndLine<AisShipStatic>) (TimestampedAndLine<?>) m;
244             return Observable.just(m2);
245         } else
246             return Observable.empty();
247     };
248 }