1 package au.gov.amsa.ais;
2
3 import static au.gov.amsa.ais.NmeaMessageExactEarthTimestamp.isExactEarthTimestamp;
4
5 import java.util.List;
6 import java.util.Set;
7 import java.util.TreeSet;
8 import java.util.concurrent.atomic.AtomicLong;
9
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12
13 import au.gov.amsa.util.nmea.NmeaMessage;
14 import au.gov.amsa.util.nmea.NmeaMessageParseException;
15 import au.gov.amsa.util.nmea.NmeaUtil;
16
17 import com.google.common.annotations.VisibleForTesting;
18 import com.google.common.base.Optional;
19 import com.google.common.collect.Lists;
20 import com.google.common.collect.Sets;
21
22
23
24
25
26
27
28 public class NmeaStreamProcessor {
29
30 private static final int DEFAULT_NMEA_BUFFER_SIZE = 100;
31 private static final int DEFAULT_LOG_COUNT_FREQUENCY = 100000;
32 private static Logger log = LoggerFactory
33 .getLogger(NmeaStreamProcessor.class);
34 private static final long MAXIMUM_ARRIVAL_TIME_DIFFERENCE_MS = 1000;
35 private final NmeaStreamProcessorListener listener;
36 private final List<LineAndTime> lines = Lists.newArrayList();
37 private final boolean matchWithTimestampLine;
38 private final AtomicLong count = new AtomicLong();
39 private final long logCountFrequency;
40 private final AisNmeaBuffer nmeaBuffer;
41
42
43
44
45
46
47
48 public NmeaStreamProcessor(NmeaStreamProcessorListener listener,
49 boolean matchWithTimestampLine, long logCountFrequency,
50 int nmeaBufferSize) {
51 this.listener = listener;
52 this.matchWithTimestampLine = matchWithTimestampLine;
53 this.logCountFrequency = logCountFrequency;
54 this.nmeaBuffer = new AisNmeaBuffer(nmeaBufferSize);
55 }
56
57
58
59
60
61
62
63 public NmeaStreamProcessor(NmeaStreamProcessorListener listener,
64 boolean matchWithTimestampLine, long logCountFrequency) {
65 this(listener, matchWithTimestampLine, logCountFrequency,
66 DEFAULT_NMEA_BUFFER_SIZE);
67 }
68
69 public NmeaStreamProcessor(NmeaStreamProcessorListener listener,
70 boolean matchWithTimestampLine) {
71 this(listener, matchWithTimestampLine, DEFAULT_LOG_COUNT_FREQUENCY,
72 DEFAULT_NMEA_BUFFER_SIZE);
73 }
74
75
76
77
78
79
80
81 public void line(String line) {
82 line(line, System.currentTimeMillis());
83 }
84
85
86
87
88
89
90
91 synchronized void line(String line, long arrivalTime) {
92
93 if (count.incrementAndGet() % logCountFrequency == 0)
94 log.info("count=" + count.get() + ",buffer size=" + lines.size());
95
96 NmeaMessage nmea;
97 try {
98 nmea = NmeaUtil.parseNmea(line);
99 } catch (NmeaMessageParseException e) {
100 listener.invalidNmea(line, arrivalTime, e.getMessage());
101 return;
102 }
103
104
105
106 if (!nmea.isSingleSentence()) {
107 Optional<List<NmeaMessage>> messages = nmeaBuffer.add(nmea);
108 if (messages.isPresent()) {
109 Optional<NmeaMessage> joined = AisNmeaBuffer
110 .concatenateMessages(messages.get());
111 if (joined.isPresent()) {
112 if (joined.get().getUnixTimeMillis() != null)
113 listener.message(joined.get().toLine(), joined.get()
114 .getUnixTimeMillis());
115 else
116 listener.message(joined.get().toLine(), arrivalTime);
117 }
118
119
120 }
121 return;
122 }
123
124 if (nmea.getUnixTimeMillis() != null) {
125 listener.message(line, nmea.getUnixTimeMillis());
126 return;
127 }
128
129 if (!matchWithTimestampLine) {
130 listener.message(line, arrivalTime);
131 return;
132 }
133
134 if (!NmeaUtil.isValid(line))
135 return;
136
137 addLine(line, arrivalTime);
138 log.debug("buffer lines=" + lines.size());
139 Integer earliestTimestampLineIndex = getEarliestTimestampLineIndex(lines);
140 Set<Integer> removeThese;
141 if (earliestTimestampLineIndex != null) {
142 removeThese = matchWithClosestAisMessageIfBufferLargeEnough(
143 arrivalTime, earliestTimestampLineIndex);
144 } else
145 removeThese = findExpiredIndexesBeforeIndex(lastIndex());
146 TreeSet<Integer> orderedIndexes = Sets.newTreeSet(removeThese);
147 for (int index : orderedIndexes.descendingSet()) {
148 removeLineWithIndex(index);
149 }
150 }
151
152
153
154
155
156
157 private int lastIndex() {
158 return lines.size() - 1;
159 }
160
161
162
163
164
165
166
167
168
169
170 private Set<Integer> findExpiredIndexesBeforeIndex(int index) {
171 long indexTime = getLineTime(index);
172 Set<Integer> removeThese = Sets.newHashSet();
173 for (int i = index - 1; i >= 0; i--) {
174 if (indexTime - getLineTime(i) > MAXIMUM_ARRIVAL_TIME_DIFFERENCE_MS) {
175 listener.timestampNotFound(getLine(i), getLineTime(i));
176 removeThese.add(i);
177 }
178 }
179 return removeThese;
180 }
181
182
183
184
185
186
187
188
189
190
191 private Set<Integer> matchWithClosestAisMessageIfBufferLargeEnough(
192 long arrivalTime, Integer earliestTimestampLineIndex) {
193 String timestampLine = getLine(earliestTimestampLineIndex);
194 long time = getLineTime(earliestTimestampLineIndex);
195 log.debug("ts=" + timestampLine + ",time=" + time);
196 Set<Integer> removeThese;
197 if (arrivalTime - time > MAXIMUM_ARRIVAL_TIME_DIFFERENCE_MS) {
198 removeThese = matchWithClosestAisMessageAndFindIndexesToRemove(
199 earliestTimestampLineIndex, timestampLine, time);
200 } else
201 removeThese = findExpiredIndexesBeforeIndex(earliestTimestampLineIndex);
202 return removeThese;
203 }
204
205
206
207
208
209
210
211
212
213
214
215 private Set<Integer> matchWithClosestAisMessageAndFindIndexesToRemove(
216 int earliestTimestampLineIndex, String timestampLine, long time) {
217
218
219 NmeaMessageExactEarthTimestamp timestamp = new NmeaMessageExactEarthTimestamp(
220 timestampLine);
221 String checksum = timestamp.getFollowingSequenceChecksum();
222 log.debug("looking for checksum=" + checksum);
223
224 Integer lowestTimeDiffIndex = findClosestMatchingMessageInTermsOfArrivalTime(
225 time, checksum);
226 Set<Integer> removeThese;
227 if (lowestTimeDiffIndex != null) {
228 removeThese = reportMatchAndFindIndexesToRemove(
229 earliestTimestampLineIndex, timestamp, lowestTimeDiffIndex);
230 } else
231
232 removeThese = Sets.newHashSet(earliestTimestampLineIndex);
233 return removeThese;
234 }
235
236
237
238
239
240
241
242
243
244
245
246 private Integer findClosestMatchingMessageInTermsOfArrivalTime(long time,
247 String checksum) {
248
249 Long lowestTimeDiff = null;
250 Integer lowestTimeDiffIndex = null;
251 for (int i = 0; i < getNumLines(); i++) {
252 String msg = getLine(i);
253 Long msgTime = getLineTime(i);
254 if (!isExactEarthTimestamp(msg)) {
255 try {
256 AisNmeaMessage nmea = new AisNmeaMessage(msg);
257 if (nmea.getChecksum().equals(checksum)) {
258 long diff = Math.abs(msgTime - time);
259 boolean closer = (lowestTimeDiff == null || diff < lowestTimeDiff)
260 && (diff <= MAXIMUM_ARRIVAL_TIME_DIFFERENCE_MS);
261 if (closer) {
262 lowestTimeDiff = diff;
263 lowestTimeDiffIndex = i;
264 }
265 }
266 } catch (AisParseException e) {
267 log.debug(e.getMessage());
268 }
269 }
270 }
271 return lowestTimeDiffIndex;
272 }
273
274
275
276
277
278
279
280
281
282
283 private Set<Integer> reportMatchAndFindIndexesToRemove(
284 Integer earliestTimestampLineIndex,
285 NmeaMessageExactEarthTimestamp timestamp,
286 Integer lowestTimeDiffIndex) {
287 String msg = getLine(lowestTimeDiffIndex);
288 log.debug("found matching msg=" + msg);
289 listener.message(msg, timestamp.getTime());
290 int maxIndex = Math
291 .max(lowestTimeDiffIndex, earliestTimestampLineIndex);
292 int minIndex = Math
293 .min(lowestTimeDiffIndex, earliestTimestampLineIndex);
294
295
296
297 return Sets.newHashSet(minIndex, maxIndex);
298 }
299
300
301
302
303
304
305
306 private void addLine(String line, long time) {
307 lines.add(new LineAndTime(line, time));
308 }
309
310
311
312
313
314
315 private int getNumLines() {
316 return lines.size();
317 }
318
319
320
321
322
323
324
325 private String getLine(int index) {
326 return lines.get(index).getLine();
327 }
328
329
330
331
332
333
334
335 private long getLineTime(int index) {
336 return lines.get(index).getTime();
337 }
338
339
340
341
342
343
344 private void removeLineWithIndex(int index) {
345 lines.remove(index);
346 }
347
348
349
350
351
352
353 @VisibleForTesting
354 List<LineAndTime> getBuffer() {
355 return Lists.newArrayList(lines);
356 }
357
358
359
360
361
362
363
364
365 private static Integer getEarliestTimestampLineIndex(List<LineAndTime> lines) {
366 Integer i = 0;
367 for (LineAndTime line : lines) {
368 if (isExactEarthTimestamp(line.getLine()))
369 return i;
370 else
371 i++;
372 }
373 return null;
374 }
375
376 }