View Javadoc
1   package au.gov.amsa.ais;
2   
3   import java.util.Collection;
4   import java.util.Collections;
5   import java.util.Comparator;
6   import java.util.LinkedHashMap;
7   import java.util.List;
8   import java.util.concurrent.atomic.AtomicBoolean;
9   
10  import com.google.common.base.Optional;
11  import com.google.common.collect.LinkedHashMultimap;
12  import com.google.common.collect.Lists;
13  
14  import au.gov.amsa.util.nmea.NmeaMessage;
15  import au.gov.amsa.util.nmea.NmeaMessageParseException;
16  import au.gov.amsa.util.nmea.NmeaUtil;
17  
18  public class AisNmeaBuffer {
19  
20      private static final int AIS_MESSAGE_COL_NO = 5;
21      private static final int MIN_NUM_COLS_FOR_LINE_TO_BE_AGGREGATED = 6;
22      private static final int COLUMN_TO_AGGREGATE = AIS_MESSAGE_COL_NO;
23      private final int maxBufferSize;
24      private final LinkedHashMultimap<String, NmeaMessage> buffer;
25      private final AtomicBoolean adding = new AtomicBoolean();
26  
27      public AisNmeaBuffer(int maxBufferSize) {
28          this.maxBufferSize = maxBufferSize;
29          this.buffer = LinkedHashMultimap.create();
30      }
31  
32      /**
33       * Returns the complete message only once the whole group of messages has
34       * arrived otherwise returns null.
35       * 
36       * @param nmea
37       * @return
38       */
39      public Optional<List<NmeaMessage>> add(NmeaMessage nmea) {
40          // use compare-and-swap semantics instead of synchronizing to squeak a
41          // bit more performance out of this. Contention is expected to be low so
42          // this should help.
43          while (true) {
44              if (adding.compareAndSet(false, true)) {
45                  Optional<List<NmeaMessage>> result = doAdd(nmea);
46                  adding.set(false);
47                  return result;
48              }
49          }
50      }
51  
52      private Optional<List<NmeaMessage>> doAdd(NmeaMessage nmea) {
53          List<String> items = nmea.getItems();
54          if (items.size() > 0 && items.size() < MIN_NUM_COLS_FOR_LINE_TO_BE_AGGREGATED)
55              return Optional.of(Collections.singletonList(nmea));
56          if (nmea.isSingleSentence()) {
57              return Optional.of(Collections.singletonList(nmea));
58          } else {
59              String groupId = nmea.getSentenceGroupId();
60              buffer.put(groupId, nmea);
61              // trim the oldest if we have reached max buffer size
62              if (buffer.size() > maxBufferSize)
63                  buffer.removeAll(buffer.keySet().iterator().next());
64              int numGroupMessages = nmea.getSentenceCount();
65              int numGroupMessagesSoFar = buffer.get(groupId).size();
66              if (numGroupMessagesSoFar == numGroupMessages) {
67                  // we have all messages in that group now so concatenate
68                  List<NmeaMessage> list = orderMessages(buffer.get(groupId));
69                  // NmeaMessage concatenatedMessage = concatenateMessages(list);
70                  buffer.removeAll(groupId);
71                  return Optional.of(list);
72              } else
73                  return Optional.absent();
74          }
75      }
76  
77      /**
78       * Returns the aggregated message or if an {@link NmeaMessageParseException}
79       * occurs returns null.
80       * 
81       * @param list
82       * @return
83       */
84      public static Optional<NmeaMessage> concatenateMessages(List<NmeaMessage> list) {
85          if (list.size() == 1)
86              return Optional.of(list.get(0));
87  
88          NmeaMessage first = list.get(0);
89          // concatenate column 5 and use row 1 tag block
90  
91          // copy tags so we can modify
92          LinkedHashMap<String, String> tags = new LinkedHashMap<String, String>(first.getTags());
93  
94          StringBuilder s = new StringBuilder();
95          List<String> foundItems = null;
96          for (NmeaMessage t : list) {
97              if (!t.getItems().isEmpty()) {
98                  // parse the num and count values from the NMEA colums
99                  // an VSI message will fail this check
100                 try {
101                     Integer.parseInt(t.getItems().get(1));
102                     Integer.parseInt(t.getItems().get(2));
103                     s.append(t.getItems().get(COLUMN_TO_AGGREGATE));
104                     foundItems = t.getItems();
105                 } catch (NumberFormatException e) {
106                     // ignore
107                 }
108             }
109         }
110         if (foundItems == null) {
111             return Optional.absent();
112         }
113         // copy cols so we can modify, don't want to affect the original nmea
114         // message
115         List<String> cols = Lists.newArrayList(foundItems);
116         cols.set(COLUMN_TO_AGGREGATE, s.toString());
117         // set num sentences to be 1 and current sentence number to be 1
118         cols.set(1, "1");
119         cols.set(2, "1");
120 
121         tags.put("g", "1-1-" + first.getSentenceGroupId());
122         try {
123             String checksum = NmeaUtil.getChecksum(NmeaUtil.createNmeaLine(tags, cols));
124             NmeaMessage message = new NmeaMessage(tags, cols, checksum);
125             return Optional.of(message);
126         } catch (NmeaMessageParseException e) {
127             return Optional.absent();
128         }
129     }
130 
131     private static List<NmeaMessage> orderMessages(Collection<NmeaMessage> c) {
132         List<NmeaMessage> list = Lists.newArrayList(c);
133         Collections.sort(list, new Comparator<NmeaMessage>() {
134             @Override
135             public int compare(NmeaMessage a, NmeaMessage b) {
136                 return a.getSentenceNumber().compareTo(b.getSentenceNumber());
137             }
138         });
139         return list;
140     }
141 
142     public int size() {
143         return buffer.size();
144     }
145 }