1 package au.gov.amsa.navigation.ais;
2
3 import rx.functions.Func2;
4 import au.gov.amsa.ais.AisMessage;
5 import au.gov.amsa.ais.message.AisPositionBExtended;
6 import au.gov.amsa.ais.message.AisShipStaticA;
7 import au.gov.amsa.ais.rx.Streams.TimestampedAndLine;
8
9 import com.google.common.base.Optional;
10
11 public final class AisMessageAndVesselData {
12
13 private Optional<TimestampedAndLine<AisMessage>> message;
14 private VesselData data;
15
16 public AisMessageAndVesselData(Optional<TimestampedAndLine<AisMessage>> message, VesselData data) {
17 this.message = message;
18 this.data = data;
19 }
20
21 public AisMessageAndVesselData() {
22 this(Optional.<TimestampedAndLine<AisMessage>> absent(), new VesselData());
23 }
24
25 public Optional<TimestampedAndLine<AisMessage>> message() {
26 return message;
27 }
28
29 public VesselData data() {
30 return data;
31 }
32
33 public static Func2<AisMessageAndVesselData, TimestampedAndLine<AisMessage>, AisMessageAndVesselData> aggregate = (
34 messageAndData, message) -> {
35 if (!message.getMessage().isPresent())
36 throw new RuntimeException("unexpected");
37 Optional<String> line = Optional.of(message.getLine());
38 AisMessage m = message.getMessage().get().message();
39 if (m instanceof AisShipStaticA)
40 return new AisMessageAndVesselData(Optional.of(message), messageAndData.data().add(
41 (AisShipStaticA) m, line));
42 else if (m instanceof AisPositionBExtended)
43 return new AisMessageAndVesselData(Optional.of(message), messageAndData.data().add(
44 (AisPositionBExtended) m, line));
45 else
46 return new AisMessageAndVesselData(Optional.of(message), messageAndData.data());
47 };
48
49 }