1 package au.gov.amsa.navigation;
2
3 import java.io.File;
4 import java.io.PrintStream;
5 import java.text.SimpleDateFormat;
6 import java.util.Date;
7 import java.util.Iterator;
8 import java.util.List;
9 import java.util.TimeZone;
10 import java.util.TreeSet;
11
12 import com.github.davidmoten.guavamini.Preconditions;
13 import com.github.davidmoten.rx.Checked;
14 import com.github.davidmoten.rx.Transformers;
15 import com.github.davidmoten.rx.slf4j.Logging;
16 import com.google.common.base.Optional;
17
18 import au.gov.amsa.ais.AisMessage;
19 import au.gov.amsa.ais.Timestamped;
20 import au.gov.amsa.ais.message.AisShipStatic;
21 import au.gov.amsa.ais.message.AisShipStaticA;
22 import au.gov.amsa.ais.message.AisShipStaticUtil;
23 import au.gov.amsa.ais.rx.Streams;
24 import au.gov.amsa.ais.rx.Streams.TimestampedAndLine;
25 import rx.Observable;
26 import rx.Scheduler;
27 import rx.functions.Action1;
28 import rx.functions.Func0;
29 import rx.functions.Func1;
30 import rx.observables.GroupedObservable;
31 import rx.schedulers.Schedulers;
32
33 public final class ShipStaticDataCreator {
34
35 public static Observable<AisShipStatic> writeStaticDataToFile(List<File> files,
36 File outputFile) {
37
38 return writeStaticDataToFile(files, outputFile, Schedulers.computation());
39 }
40
41 public static Observable<AisShipStatic> writeStaticDataToFile(List<File> files, File outputFile,
42 Scheduler scheduler) {
43 Func0<PrintStream> resourceFactory = Checked.f0(() -> new PrintStream(outputFile));
44 Func1<PrintStream, Observable<AisShipStatic>> observableFactory = out -> Observable
45 .from(files)
46
47 .buffer(Math.max(1, files.size() / Runtime.getRuntime().availableProcessors() - 1))
48 .flatMap(
49 list -> Observable.from(list)
50 .lift(Logging.<File> logger().showValue().showMemory().log())
51 .concatMap(
52 file -> Streams.extract(Streams.nmeaFromGzip(file))
53 .flatMap(aisShipStaticOnly)
54 .map(m -> m.getMessage().get().message())
55 .distinct(m -> m.getMmsi())
56 .doOnError(e -> System.err.println("could not read "
57 + file + ": " + e.getMessage()))
58 .onErrorResumeNext(Observable.<AisShipStatic> empty()))
59 .distinct(m -> m.getMmsi())
60 .subscribeOn(scheduler))
61 .distinct(m -> m.getMmsi())
62 .compose(Transformers.mapWithIndex())
63 .doOnNext(indexed -> {
64 if (indexed.index() == 0) {
65 out.println(
66 "# MMSI, IMO, AisClass, AisShipType, MaxPresentStaticDraughtMetres, DimAMetres, DimBMetres, DimCMetres, DimDMetres, LengthMetres, WidthMetres, Name");
67 out.println("# columns are tab delimited");
68 out.println("# -1 = not present");
69 }
70 })
71
72 .map(indexed -> indexed.value())
73
74 .doOnNext(m -> {
75 out.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", m.getMmsi(),
76 getImo(m).or(-1), m instanceof AisShipStaticA ? "A" : "B",
77 m.getShipType(), getMaximumPresentStaticDraughtMetres(m).or(-1F),
78 m.getDimensionA().or(-1), m.getDimensionB().or(-1),
79 m.getDimensionC().or(-1), m.getDimensionD().or(-1),
80 AisShipStaticUtil.lengthMetres(m).or(-1),
81 AisShipStaticUtil.widthMetres(m).or(-1), prepareName(m.getName()));
82 out.flush();
83 });
84
85 Action1<PrintStream> disposeAction = out -> out.close();
86 return Observable.using(resourceFactory, observableFactory, disposeAction);
87 }
88
89 public static Observable<Timestamped<AisShipStatic>> writeStaticDataToFileWithTimestamps(
90 List<File> files, File outputFile, Scheduler scheduler) {
91 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
92 sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
93 Func0<PrintStream> resourceFactory = Checked.f0(() -> new PrintStream(outputFile));
94 Func1<PrintStream, Observable<Timestamped<AisShipStatic>>> observableFactory = out -> Observable
95 .from(files)
96
97 .buffer(Math.max(1, files.size() / Runtime.getRuntime().availableProcessors() - 1))
98 .flatMap(list -> Observable.from(list)
99 .lift(Logging.<File> logger().showValue().showMemory().log())
100 .concatMap(file -> timestampedShipStatics(file)))
101 .groupBy(m -> m.message().getMmsi())
102 .flatMap(g -> collect(g).subscribeOn(scheduler))
103 .compose(Transformers.doOnFirst(x -> {
104 out.println(
105 "# MMSI, Time, IMO, AisClass, AisShipType, MaxPresentStaticDraughtMetres, DimAMetres, DimBMetres, DimCMetres, DimDMetres, LengthMetres, WidthMetres, Name");
106 out.println("# columns are tab delimited");
107 out.println("# -1 = not present");
108 }))
109 .filter(set -> set.size() <= 10)
110 .flatMapIterable(set -> set)
111 .doOnNext(k -> {
112 AisShipStatic m = k.message();
113 out.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", m.getMmsi(),
114 sdf.format(new Date(k.time())), getImo(m).or(-1),
115 m instanceof AisShipStaticA ? "A" : "B", m.getShipType(),
116 getMaximumPresentStaticDraughtMetres(m).or(-1F),
117 m.getDimensionA().or(-1), m.getDimensionB().or(-1),
118 m.getDimensionC().or(-1), m.getDimensionD().or(-1),
119 AisShipStaticUtil.lengthMetres(m).or(-1),
120 AisShipStaticUtil.widthMetres(m).or(-1), prepareName(m.getName()));
121 out.flush();
122 });
123 Action1<PrintStream> disposeAction = out -> out.close();
124 return Observable.using(resourceFactory, observableFactory, disposeAction);
125 }
126
127 private static Observable<Timestamped<AisShipStatic>> timestampedShipStatics(File file) {
128 return Streams.extract(Streams.nmeaFromGzip(file))
129 .flatMap(aisShipStaticOnly)
130 .doOnError(
131 e -> System.err.println("could not read " + file + ": " + e.getMessage()))
132 .onErrorResumeNext(Observable.<TimestampedAndLine<AisShipStatic>> empty())
133 .filter(x -> x.getMessage().isPresent())
134 .map(x -> x.getMessage().get());
135 }
136
137 private static Observable<TreeSet<Timestamped<AisShipStatic>>> collect(
138 GroupedObservable<Integer, Timestamped<AisShipStatic>> g) {
139 return g.collect(() -> new TreeSet<Timestamped<AisShipStatic>>(
140 (a, b) -> Long.compare(a.time(), b.time())), (set, x) -> {
141 Timestamped<AisShipStatic> a = set.floor(x);
142 Timestamped<AisShipStatic> b = set.ceiling(x);
143 if (a != null && a.time() == x.time()) {
144 return;
145 }
146 if (b != null && b.time() == x.time()) {
147 return;
148 }
149
150
151
152
153 if (a == null) {
154
155 if (b == null) {
156
157 set.add(x);
158 } else if (isDifferent(b.message(), x.message())) {
159 set.add(x);
160 }
161
162 } else {
163 boolean axDifferent = isDifferent(a.message(), x.message());
164 if (b == null) {
165
166 if (axDifferent) {
167 set.add(x);
168 }
169 } else {
170 boolean bxDifferent = isDifferent(x.message(), b.message());
171 if (axDifferent) {
172 set.add(x);
173 if (!bxDifferent) {
174 remove(set, b);
175 }
176 }
177 }
178 }
179 });
180 }
181
182 private static void remove(TreeSet<Timestamped<AisShipStatic>> set,
183 Timestamped<AisShipStatic> a) {
184
185 Iterator<Timestamped<AisShipStatic>> it = set.iterator();
186 while (it.hasNext()) {
187 if (it.next() == a) {
188 it.remove();
189 }
190 }
191 }
192
193 private static final boolean justImo = true;
194
195 private static boolean isDifferent(AisShipStatic a, AisShipStatic b) {
196 Preconditions.checkArgument(a.getMmsi() == b.getMmsi());
197
198 boolean different = !justImo
199 && (!a.getDimensionA().equals(b.getDimensionA())
200 || !a.getDimensionB().equals(b.getDimensionB())
201 || !a.getDimensionC().equals(b.getDimensionC())
202 || !a.getDimensionD().equals(b.getDimensionD())
203 || !a.getLengthMetres().equals(b.getLengthMetres())
204 || !a.getWidthMetres().equals(b.getWidthMetres())
205 || !a.getName().equals(b.getName())
206 || a.getShipType() != b.getShipType());
207 if (different) {
208 return true;
209 } else if (a instanceof AisShipStaticA && b instanceof AisShipStaticA) {
210 AisShipStaticA a2 = (AisShipStaticA) a;
211 AisShipStaticA b2 = (AisShipStaticA) b;
212 return !a2.getImo().equals(b2.getImo());
213 } else {
214 return false;
215 }
216 }
217
218 private static String prepareName(String name) {
219 if (name == null)
220 return "";
221 else
222 return name.replace("\t", " ").trim();
223 }
224
225 private static Optional<Integer> getImo(AisShipStatic m) {
226 if (m instanceof AisShipStaticA) {
227 return ((AisShipStaticA) m).getImo();
228 } else
229 return Optional.absent();
230 }
231
232 private static Optional<Float> getMaximumPresentStaticDraughtMetres(AisShipStatic m) {
233 if (m instanceof AisShipStaticA) {
234 return Optional.of((float) ((AisShipStaticA) m).getMaximumPresentStaticDraughtMetres());
235 } else
236 return Optional.absent();
237 }
238
239 private static Func1<TimestampedAndLine<AisMessage>, Observable<TimestampedAndLine<AisShipStatic>>> aisShipStaticOnly = m -> {
240 Optional<Timestamped<AisMessage>> message = m.getMessage();
241 if (message.isPresent() && message.get().message() instanceof AisShipStatic) {
242 @SuppressWarnings("unchecked")
243 TimestampedAndLine<AisShipStatic> m2 = (TimestampedAndLine<AisShipStatic>) (TimestampedAndLine<?>) m;
244 return Observable.just(m2);
245 } else
246 return Observable.empty();
247 };
248 }