View Javadoc
1   package au.gov.amsa.ais;
2   
3   import static au.gov.amsa.ais.NmeaMessageExactEarthTimestamp.isExactEarthTimestamp;
4   
5   import java.util.List;
6   import java.util.Set;
7   import java.util.TreeSet;
8   import java.util.concurrent.atomic.AtomicLong;
9   
10  import org.slf4j.Logger;
11  import org.slf4j.LoggerFactory;
12  
13  import au.gov.amsa.util.nmea.NmeaMessage;
14  import au.gov.amsa.util.nmea.NmeaMessageParseException;
15  import au.gov.amsa.util.nmea.NmeaUtil;
16  
17  import com.google.common.annotations.VisibleForTesting;
18  import com.google.common.base.Optional;
19  import com.google.common.collect.Lists;
20  import com.google.common.collect.Sets;
21  
22  /**
23   * Extracts time from a message if possible and reports results to listeners.
24   * 
25   * @author dxm
26   * 
27   */
28  public class NmeaStreamProcessor {
29  
30  	private static final int DEFAULT_NMEA_BUFFER_SIZE = 100;
31  	private static final int DEFAULT_LOG_COUNT_FREQUENCY = 100000;
32  	private static Logger log = LoggerFactory
33  			.getLogger(NmeaStreamProcessor.class);
34  	private static final long MAXIMUM_ARRIVAL_TIME_DIFFERENCE_MS = 1000;
35  	private final NmeaStreamProcessorListener listener;
36  	private final List<LineAndTime> lines = Lists.newArrayList();
37  	private final boolean matchWithTimestampLine;
38  	private final AtomicLong count = new AtomicLong();
39  	private final long logCountFrequency;
40  	private final AisNmeaBuffer nmeaBuffer;
41  
42  	/**
43  	 * Constructor.
44  	 * 
45  	 * @param listener
46  	 * @param matchWithTimestampLine
47  	 */
48  	public NmeaStreamProcessor(NmeaStreamProcessorListener listener,
49  			boolean matchWithTimestampLine, long logCountFrequency,
50  			int nmeaBufferSize) {
51  		this.listener = listener;
52  		this.matchWithTimestampLine = matchWithTimestampLine;
53  		this.logCountFrequency = logCountFrequency;
54  		this.nmeaBuffer = new AisNmeaBuffer(nmeaBufferSize);
55  	}
56  
57  	/**
58  	 * Constructor.
59  	 * 
60  	 * @param listener
61  	 * @param matchWithTimestampLine
62  	 */
63  	public NmeaStreamProcessor(NmeaStreamProcessorListener listener,
64  			boolean matchWithTimestampLine, long logCountFrequency) {
65  		this(listener, matchWithTimestampLine, logCountFrequency,
66  				DEFAULT_NMEA_BUFFER_SIZE);
67  	}
68  
69  	public NmeaStreamProcessor(NmeaStreamProcessorListener listener,
70  			boolean matchWithTimestampLine) {
71  		this(listener, matchWithTimestampLine, DEFAULT_LOG_COUNT_FREQUENCY,
72  				DEFAULT_NMEA_BUFFER_SIZE);
73  	}
74  
75  	/**
76  	 * Handles the arrival of a new NMEA line and assumes its arrival time is
77  	 * now.
78  	 * 
79  	 * @param line
80  	 */
81  	public void line(String line) {
82  		line(line, System.currentTimeMillis());
83  	}
84  
85  	/**
86  	 * Handles the arrival of a new NMEA line at the given arrival time.
87  	 * 
88  	 * @param line
89  	 * @param arrivalTime
90  	 */
91  	synchronized void line(String line, long arrivalTime) {
92  
93  		if (count.incrementAndGet() % logCountFrequency == 0)
94  			log.info("count=" + count.get() + ",buffer size=" + lines.size());
95  
96  		NmeaMessage nmea;
97  		try {
98  			nmea = NmeaUtil.parseNmea(line);
99  		} catch (NmeaMessageParseException e) {
100 			listener.invalidNmea(line, arrivalTime, e.getMessage());
101 			return;
102 		}
103 
104 		// if is multi line message then don't report to listener till last
105 		// message in sequence has been received.
106 		if (!nmea.isSingleSentence()) {
107 			Optional<List<NmeaMessage>> messages = nmeaBuffer.add(nmea);
108 			if (messages.isPresent()) {
109 				Optional<NmeaMessage> joined = AisNmeaBuffer
110 						.concatenateMessages(messages.get());
111 				if (joined.isPresent()) {
112 					if (joined.get().getUnixTimeMillis() != null)
113 						listener.message(joined.get().toLine(), joined.get()
114 								.getUnixTimeMillis());
115 					else
116 						listener.message(joined.get().toLine(), arrivalTime);
117 				}
118 				// TODO else report error, might need to change signature of
119 				// listener to handle problem with multi-line message
120 			}
121 			return;
122 		}
123 
124 		if (nmea.getUnixTimeMillis() != null) {
125 			listener.message(line, nmea.getUnixTimeMillis());
126 			return;
127 		}
128 
129 		if (!matchWithTimestampLine) {
130 			listener.message(line, arrivalTime);
131 			return;
132 		}
133 
134 		if (!NmeaUtil.isValid(line))
135 			return;
136 
137 		addLine(line, arrivalTime);
138 		log.debug("buffer lines=" + lines.size());
139 		Integer earliestTimestampLineIndex = getEarliestTimestampLineIndex(lines);
140 		Set<Integer> removeThese;
141 		if (earliestTimestampLineIndex != null) {
142 			removeThese = matchWithClosestAisMessageIfBufferLargeEnough(
143 					arrivalTime, earliestTimestampLineIndex);
144 		} else
145 			removeThese = findExpiredIndexesBeforeIndex(lastIndex());
146 		TreeSet<Integer> orderedIndexes = Sets.newTreeSet(removeThese);
147 		for (int index : orderedIndexes.descendingSet()) {
148 			removeLineWithIndex(index);
149 		}
150 	}
151 
152 	/**
153 	 * Returns the last index of the buffered lines.
154 	 * 
155 	 * @return
156 	 */
157 	private int lastIndex() {
158 		return lines.size() - 1;
159 	}
160 
161 	/**
162 	 * Returns the list of those indexes that can be removed from the buffer
163 	 * because they have a timestamp more than
164 	 * MAXIMUM_ARRIVAL_TIME_DIFFERENCE_MS from the arrival time of the given
165 	 * index.
166 	 * 
167 	 * @param index
168 	 * @return
169 	 */
170 	private Set<Integer> findExpiredIndexesBeforeIndex(int index) {
171 		long indexTime = getLineTime(index);
172 		Set<Integer> removeThese = Sets.newHashSet();
173 		for (int i = index - 1; i >= 0; i--) {
174 			if (indexTime - getLineTime(i) > MAXIMUM_ARRIVAL_TIME_DIFFERENCE_MS) {
175 				listener.timestampNotFound(getLine(i), getLineTime(i));
176 				removeThese.add(i);
177 			}
178 		}
179 		return removeThese;
180 	}
181 
182 	/**
183 	 * Finds matches and reports them to the listeners if buffer has a sufficent
184 	 * span of time in it. Returns the indexes that should be removed from the
185 	 * buffer.
186 	 * 
187 	 * @param arrivalTime
188 	 * @param earliestTimestampLineIndex
189 	 * @return
190 	 */
191 	private Set<Integer> matchWithClosestAisMessageIfBufferLargeEnough(
192 			long arrivalTime, Integer earliestTimestampLineIndex) {
193 		String timestampLine = getLine(earliestTimestampLineIndex);
194 		long time = getLineTime(earliestTimestampLineIndex);
195 		log.debug("ts=" + timestampLine + ",time=" + time);
196 		Set<Integer> removeThese;
197 		if (arrivalTime - time > MAXIMUM_ARRIVAL_TIME_DIFFERENCE_MS) {
198 			removeThese = matchWithClosestAisMessageAndFindIndexesToRemove(
199 					earliestTimestampLineIndex, timestampLine, time);
200 		} else
201 			removeThese = findExpiredIndexesBeforeIndex(earliestTimestampLineIndex);
202 		return removeThese;
203 	}
204 
205 	/**
206 	 * 
207 	 * Finds matches and reports them to the listeners. Returns the indexes that
208 	 * should be removed form the buffer.
209 	 * 
210 	 * @param earliestTimestampLineIndex
211 	 * @param timestampLine
212 	 * @param time
213 	 * @return
214 	 */
215 	private Set<Integer> matchWithClosestAisMessageAndFindIndexesToRemove(
216 			int earliestTimestampLineIndex, String timestampLine, long time) {
217 		// find closest ais message in terms of arrival time to the arrival
218 		// time of the timestamp line
219 		NmeaMessageExactEarthTimestamp timestamp = new NmeaMessageExactEarthTimestamp(
220 				timestampLine);
221 		String checksum = timestamp.getFollowingSequenceChecksum();
222 		log.debug("looking for checksum=" + checksum);
223 
224 		Integer lowestTimeDiffIndex = findClosestMatchingMessageInTermsOfArrivalTime(
225 				time, checksum);
226 		Set<Integer> removeThese;
227 		if (lowestTimeDiffIndex != null) {
228 			removeThese = reportMatchAndFindIndexesToRemove(
229 					earliestTimestampLineIndex, timestamp, lowestTimeDiffIndex);
230 		} else
231 			// no match within BUFFER ms so remove
232 			removeThese = Sets.newHashSet(earliestTimestampLineIndex);
233 		return removeThese;
234 	}
235 
236 	/**
237 	 * Returns the index of the closest matching message in terms of arrival
238 	 * time. Matching equates to having a referenced checksum. If not found
239 	 * returns null. If there is a problem parsing the message an
240 	 * {@link AisParseException} is thrown.
241 	 * 
242 	 * @param time
243 	 * @param checksum
244 	 * @return
245 	 */
246 	private Integer findClosestMatchingMessageInTermsOfArrivalTime(long time,
247 			String checksum) {
248 		// find the closest matching ais message in terms of arrival time
249 		Long lowestTimeDiff = null;
250 		Integer lowestTimeDiffIndex = null;
251 		for (int i = 0; i < getNumLines(); i++) {
252 			String msg = getLine(i);
253 			Long msgTime = getLineTime(i);
254 			if (!isExactEarthTimestamp(msg)) {
255 				try {
256 					AisNmeaMessage nmea = new AisNmeaMessage(msg);
257 					if (nmea.getChecksum().equals(checksum)) {
258 						long diff = Math.abs(msgTime - time);
259 						boolean closer = (lowestTimeDiff == null || diff < lowestTimeDiff)
260 								&& (diff <= MAXIMUM_ARRIVAL_TIME_DIFFERENCE_MS);
261 						if (closer) {
262 							lowestTimeDiff = diff;
263 							lowestTimeDiffIndex = i;
264 						}
265 					}
266 				} catch (AisParseException e) {
267 					log.debug(e.getMessage());
268 				}
269 			}
270 		}
271 		return lowestTimeDiffIndex;
272 	}
273 
274 	/**
275 	 * Reports a found match and returns the indexes that can be removed from
276 	 * the buffer.
277 	 * 
278 	 * @param earliestTimestampLineIndex
279 	 * @param timestamp
280 	 * @param lowestTimeDiffIndex
281 	 * @return
282 	 */
283 	private Set<Integer> reportMatchAndFindIndexesToRemove(
284 			Integer earliestTimestampLineIndex,
285 			NmeaMessageExactEarthTimestamp timestamp,
286 			Integer lowestTimeDiffIndex) {
287 		String msg = getLine(lowestTimeDiffIndex);
288 		log.debug("found matching msg=" + msg);
289 		listener.message(msg, timestamp.getTime());
290 		int maxIndex = Math
291 				.max(lowestTimeDiffIndex, earliestTimestampLineIndex);
292 		int minIndex = Math
293 				.min(lowestTimeDiffIndex, earliestTimestampLineIndex);
294 
295 		// now remove from lists must remove bigger index first,
296 		// otherwise indexes will change
297 		return Sets.newHashSet(minIndex, maxIndex);
298 	}
299 
300 	/**
301 	 * Add a line to the buffer.
302 	 * 
303 	 * @param line
304 	 * @param time
305 	 */
306 	private void addLine(String line, long time) {
307 		lines.add(new LineAndTime(line, time));
308 	}
309 
310 	/**
311 	 * Returns the buffer size.
312 	 * 
313 	 * @return
314 	 */
315 	private int getNumLines() {
316 		return lines.size();
317 	}
318 
319 	/**
320 	 * Returns the buffer line at given index (zero based).
321 	 * 
322 	 * @param index
323 	 * @return
324 	 */
325 	private String getLine(int index) {
326 		return lines.get(index).getLine();
327 	}
328 
329 	/**
330 	 * Returns the arrival time of line at given index (zero based).
331 	 * 
332 	 * @param index
333 	 * @return
334 	 */
335 	private long getLineTime(int index) {
336 		return lines.get(index).getTime();
337 	}
338 
339 	/**
340 	 * Remove line from the buffer with given index (zero based).
341 	 * 
342 	 * @param index
343 	 */
344 	private void removeLineWithIndex(int index) {
345 		lines.remove(index);
346 	}
347 
348 	/**
349 	 * Returns a copy of the buffer.
350 	 * 
351 	 * @return
352 	 */
353 	@VisibleForTesting
354 	List<LineAndTime> getBuffer() {
355 		return Lists.newArrayList(lines);
356 	}
357 
358 	/**
359 	 * Returns the index of the earliest timestamp (PGHP) line. If none found
360 	 * returns null.
361 	 * 
362 	 * @param lines
363 	 * @return
364 	 */
365 	private static Integer getEarliestTimestampLineIndex(List<LineAndTime> lines) {
366 		Integer i = 0;
367 		for (LineAndTime line : lines) {
368 			if (isExactEarthTimestamp(line.getLine()))
369 				return i;
370 			else
371 				i++;
372 		}
373 		return null;
374 	}
375 
376 }