1 package au.gov.amsa.navigation.ais;
2
3 import static com.google.common.base.Optional.fromNullable;
4 import static com.google.common.base.Optional.of;
5
6 import java.util.Comparator;
7
8 import rx.Observable;
9 import rx.Observable.Transformer;
10 import rx.functions.Func1;
11 import au.gov.amsa.ais.AisMessage;
12 import au.gov.amsa.ais.message.AisPosition;
13 import au.gov.amsa.ais.message.AisPositionA;
14 import au.gov.amsa.ais.rx.Streams;
15 import au.gov.amsa.ais.rx.Streams.TimestampedAndLine;
16 import au.gov.amsa.navigation.Mmsi;
17 import au.gov.amsa.navigation.VesselClass;
18 import au.gov.amsa.navigation.VesselPosition;
19 import au.gov.amsa.navigation.VesselPosition.NavigationalStatus;
20
21 import com.google.common.base.Optional;
22
23 public class AisVesselPositions {
24
25 public static Observable<VesselPosition> positions(Observable<String> nmea) {
26 return Streams.extract(nmea).filter(isPresent())
27
28 .scan(new AisMessageAndVesselData(), AisMessageAndVesselData.aggregate)
29
30 .filter(isPosition)
31
32 .map(toVesselPosition);
33 }
34
35 public static Transformer<String, VesselPosition> positions() {
36 return nmea -> positions(nmea);
37 }
38
39 private static Func1<TimestampedAndLine<AisMessage>, Boolean> isPresent() {
40 return t -> t.getMessage().isPresent();
41 }
42
43 public static Observable<TimestampedAndLine<AisMessage>> sortByTime(
44 Observable<TimestampedAndLine<AisMessage>> source) {
45 Comparator<TimestampedAndLine<AisMessage>> comparator = (t1, t2) -> ((Long) t1.getMessage()
46 .get().time()).compareTo(t2.getMessage().get().time());
47 return source
48
49 .lift(new SortOperator<TimestampedAndLine<AisMessage>>(comparator, 20000000));
50 }
51
52 public static Observable<VesselPosition> positionsSortedByTime(Observable<String> nmea) {
53 return sortByTime(Streams.extract(nmea))
54
55 .scan(new AisMessageAndVesselData(), AisMessageAndVesselData.aggregate)
56
57 .filter(isPosition)
58
59 .map(toVesselPosition);
60 }
61
62 private static final Func1<AisMessageAndVesselData, Boolean> isPosition = m -> {
63 if (m.message().isPresent()
64 && m.message().get().getMessage().get().message() instanceof AisPosition) {
65 AisPosition p = (AisPosition) m.message().get().getMessage().get().message();
66 return (p.getLatitude() != null && p.getLongitude() != null);
67 } else
68 return false;
69 };
70
71 private static final Func1<AisMessageAndVesselData, VesselPosition> toVesselPosition = messageAndData -> {
72
73 AisPosition p = (AisPosition) messageAndData.message().get().getMessage().get().message();
74
75 VesselClass cls;
76 if (p instanceof AisPositionA)
77 cls = VesselClass.A;
78 else
79 cls = VesselClass.B;
80 Mmsi id = new Mmsi(p.getMmsi());
81 Optional<Vessel> vessel = messageAndData.data().get(id);
82 Optional<Integer> lengthMetres = vessel.isPresent() ? vessel.get().getLengthMetres()
83 : Optional.<Integer> absent();
84
85 Optional<Integer> widthMetres = vessel.isPresent() ? vessel.get().getWidthMetres()
86 : Optional.<Integer> absent();
87
88 Optional<Double> speedMetresPerSecond = p.getSpeedOverGroundKnots() != null ? of(p
89 .getSpeedOverGroundKnots() * 0.5144444444) : Optional.<Double> absent();
90
91 Optional<Integer> shipType = vessel.isPresent() ? vessel.get().getShipType() : Optional
92 .<Integer> absent();
93
94 NavigationalStatus navigationalStatus;
95 if (p instanceof AisPositionA) {
96 AisPositionA a = (AisPositionA) p;
97 if (Util.equals(a.getNavigationalStatus(),
98 au.gov.amsa.ais.message.NavigationalStatus.AT_ANCHOR))
99 navigationalStatus = NavigationalStatus.AT_ANCHOR;
100 else if (Util.equals(a.getNavigationalStatus(),
101 au.gov.amsa.ais.message.NavigationalStatus.MOORED)) {
102 navigationalStatus = NavigationalStatus.MOORED;
103 } else
104 navigationalStatus = NavigationalStatus.NOT_DEFINED;
105 } else
106 navigationalStatus = NavigationalStatus.NOT_DEFINED;
107
108 Optional<String> positionAisNmea;
109 if (p instanceof AisPositionA) {
110 positionAisNmea = Optional.of(messageAndData.message().get().getLine());
111 } else
112 positionAisNmea = Optional.absent();
113
114 Optional<String> shipStaticAisNmea;
115 if (vessel.isPresent())
116 shipStaticAisNmea = vessel.get().getNmea();
117 else
118 shipStaticAisNmea = Optional.absent();
119
120
121
122
123
124 return VesselPosition.builder()
125
126 .cogDegrees(fromNullable(p.getCourseOverGround()))
127
128 .headingDegrees(fromNullable(toDouble(p.getTrueHeading())))
129
130 .speedMetresPerSecond(speedMetresPerSecond)
131
132 .lat(p.getLatitude())
133
134 .lon(p.getLongitude())
135
136 .id(id)
137
138 .lengthMetres(lengthMetres)
139
140 .widthMetres(widthMetres)
141
142 .time(messageAndData.message().get().getMessage().get().time())
143
144 .shipType(shipType)
145
146 .cls(cls)
147
148 .navigationalStatus(navigationalStatus)
149
150 .positionAisNmea(positionAisNmea)
151
152 .shipStaticAisNmea(shipStaticAisNmea)
153
154 .build();
155 };
156
157 private static Double toDouble(Number i) {
158 if (i == null)
159 return null;
160 else
161 return i.doubleValue();
162 }
163
164 }