1 package au.gov.amsa.ais;
2
3 import java.util.Collection;
4 import java.util.Collections;
5 import java.util.Comparator;
6 import java.util.LinkedHashMap;
7 import java.util.List;
8 import java.util.concurrent.atomic.AtomicBoolean;
9
10 import com.google.common.base.Optional;
11 import com.google.common.collect.LinkedHashMultimap;
12 import com.google.common.collect.Lists;
13
14 import au.gov.amsa.util.nmea.NmeaMessage;
15 import au.gov.amsa.util.nmea.NmeaMessageParseException;
16 import au.gov.amsa.util.nmea.NmeaUtil;
17
18 public class AisNmeaBuffer {
19
20 private static final int AIS_MESSAGE_COL_NO = 5;
21 private static final int MIN_NUM_COLS_FOR_LINE_TO_BE_AGGREGATED = 6;
22 private static final int COLUMN_TO_AGGREGATE = AIS_MESSAGE_COL_NO;
23 private final int maxBufferSize;
24 private final LinkedHashMultimap<String, NmeaMessage> buffer;
25 private final AtomicBoolean adding = new AtomicBoolean();
26
27 public AisNmeaBuffer(int maxBufferSize) {
28 this.maxBufferSize = maxBufferSize;
29 this.buffer = LinkedHashMultimap.create();
30 }
31
32
33
34
35
36
37
38
39 public Optional<List<NmeaMessage>> add(NmeaMessage nmea) {
40
41
42
43 while (true) {
44 if (adding.compareAndSet(false, true)) {
45 Optional<List<NmeaMessage>> result = doAdd(nmea);
46 adding.set(false);
47 return result;
48 }
49 }
50 }
51
52 private Optional<List<NmeaMessage>> doAdd(NmeaMessage nmea) {
53 List<String> items = nmea.getItems();
54 if (items.size() > 0 && items.size() < MIN_NUM_COLS_FOR_LINE_TO_BE_AGGREGATED)
55 return Optional.of(Collections.singletonList(nmea));
56 if (nmea.isSingleSentence()) {
57 return Optional.of(Collections.singletonList(nmea));
58 } else {
59 String groupId = nmea.getSentenceGroupId();
60 buffer.put(groupId, nmea);
61
62 if (buffer.size() > maxBufferSize)
63 buffer.removeAll(buffer.keySet().iterator().next());
64 int numGroupMessages = nmea.getSentenceCount();
65 int numGroupMessagesSoFar = buffer.get(groupId).size();
66 if (numGroupMessagesSoFar == numGroupMessages) {
67
68 List<NmeaMessage> list = orderMessages(buffer.get(groupId));
69
70 buffer.removeAll(groupId);
71 return Optional.of(list);
72 } else
73 return Optional.absent();
74 }
75 }
76
77
78
79
80
81
82
83
84 public static Optional<NmeaMessage> concatenateMessages(List<NmeaMessage> list) {
85 if (list.size() == 1)
86 return Optional.of(list.get(0));
87
88 NmeaMessage first = list.get(0);
89
90
91
92 LinkedHashMap<String, String> tags = new LinkedHashMap<String, String>(first.getTags());
93
94 StringBuilder s = new StringBuilder();
95 List<String> foundItems = null;
96 for (NmeaMessage t : list) {
97 if (!t.getItems().isEmpty()) {
98
99
100 try {
101 Integer.parseInt(t.getItems().get(1));
102 Integer.parseInt(t.getItems().get(2));
103 s.append(t.getItems().get(COLUMN_TO_AGGREGATE));
104 foundItems = t.getItems();
105 } catch (NumberFormatException e) {
106
107 }
108 }
109 }
110 if (foundItems == null) {
111 return Optional.absent();
112 }
113
114
115 List<String> cols = Lists.newArrayList(foundItems);
116 cols.set(COLUMN_TO_AGGREGATE, s.toString());
117
118 cols.set(1, "1");
119 cols.set(2, "1");
120
121 tags.put("g", "1-1-" + first.getSentenceGroupId());
122 try {
123 String checksum = NmeaUtil.getChecksum(NmeaUtil.createNmeaLine(tags, cols));
124 NmeaMessage message = new NmeaMessage(tags, cols, checksum);
125 return Optional.of(message);
126 } catch (NmeaMessageParseException e) {
127 return Optional.absent();
128 }
129 }
130
131 private static List<NmeaMessage> orderMessages(Collection<NmeaMessage> c) {
132 List<NmeaMessage> list = Lists.newArrayList(c);
133 Collections.sort(list, new Comparator<NmeaMessage>() {
134 @Override
135 public int compare(NmeaMessage a, NmeaMessage b) {
136 return a.getSentenceNumber().compareTo(b.getSentenceNumber());
137 }
138 });
139 return list;
140 }
141
142 public int size() {
143 return buffer.size();
144 }
145 }