View Javadoc
1   package au.gov.amsa.navigation.ais;
2   
3   import rx.functions.Func2;
4   import au.gov.amsa.ais.AisMessage;
5   import au.gov.amsa.ais.message.AisPositionBExtended;
6   import au.gov.amsa.ais.message.AisShipStaticA;
7   import au.gov.amsa.ais.rx.Streams.TimestampedAndLine;
8   
9   import com.google.common.base.Optional;
10  
11  public final class AisMessageAndVesselData {
12  
13      private Optional<TimestampedAndLine<AisMessage>> message;
14      private VesselData data;
15  
16      public AisMessageAndVesselData(Optional<TimestampedAndLine<AisMessage>> message, VesselData data) {
17          this.message = message;
18          this.data = data;
19      }
20  
21      public AisMessageAndVesselData() {
22          this(Optional.<TimestampedAndLine<AisMessage>> absent(), new VesselData());
23      }
24  
25      public Optional<TimestampedAndLine<AisMessage>> message() {
26          return message;
27      }
28  
29      public VesselData data() {
30          return data;
31      }
32  
33      public static Func2<AisMessageAndVesselData, TimestampedAndLine<AisMessage>, AisMessageAndVesselData> aggregate = (
34              messageAndData, message) -> {
35          if (!message.getMessage().isPresent())
36              throw new RuntimeException("unexpected");
37          Optional<String> line = Optional.of(message.getLine());
38          AisMessage m = message.getMessage().get().message();
39          if (m instanceof AisShipStaticA)
40              return new AisMessageAndVesselData(Optional.of(message), messageAndData.data().add(
41                      (AisShipStaticA) m, line));
42          else if (m instanceof AisPositionBExtended)
43              return new AisMessageAndVesselData(Optional.of(message), messageAndData.data().add(
44                      (AisPositionBExtended) m, line));
45          else
46              return new AisMessageAndVesselData(Optional.of(message), messageAndData.data());
47      };
48  
49  }