View Javadoc
1   package au.gov.amsa.navigation.ais;
2   
3   import static com.google.common.base.Optional.fromNullable;
4   import static com.google.common.base.Optional.of;
5   
6   import java.util.Comparator;
7   
8   import rx.Observable;
9   import rx.Observable.Transformer;
10  import rx.functions.Func1;
11  import au.gov.amsa.ais.AisMessage;
12  import au.gov.amsa.ais.message.AisPosition;
13  import au.gov.amsa.ais.message.AisPositionA;
14  import au.gov.amsa.ais.rx.Streams;
15  import au.gov.amsa.ais.rx.Streams.TimestampedAndLine;
16  import au.gov.amsa.navigation.Mmsi;
17  import au.gov.amsa.navigation.VesselClass;
18  import au.gov.amsa.navigation.VesselPosition;
19  import au.gov.amsa.navigation.VesselPosition.NavigationalStatus;
20  
21  import com.google.common.base.Optional;
22  
23  public class AisVesselPositions {
24  
25      public static Observable<VesselPosition> positions(Observable<String> nmea) {
26          return Streams.extract(nmea).filter(isPresent())
27          // aggregate ship data with the message
28                  .scan(new AisMessageAndVesselData(), AisMessageAndVesselData.aggregate)
29                  // positions only
30                  .filter(isPosition)
31                  // convert to vessel positions
32                  .map(toVesselPosition);
33      }
34  
35      public static Transformer<String, VesselPosition> positions() {
36          return nmea -> positions(nmea);
37      }
38  
39      private static Func1<TimestampedAndLine<AisMessage>, Boolean> isPresent() {
40          return t -> t.getMessage().isPresent();
41      }
42  
43      public static Observable<TimestampedAndLine<AisMessage>> sortByTime(
44              Observable<TimestampedAndLine<AisMessage>> source) {
45          Comparator<TimestampedAndLine<AisMessage>> comparator = (t1, t2) -> ((Long) t1.getMessage()
46                  .get().time()).compareTo(t2.getMessage().get().time());
47          return source
48          // sort by time
49                  .lift(new SortOperator<TimestampedAndLine<AisMessage>>(comparator, 20000000));
50      }
51  
52      public static Observable<VesselPosition> positionsSortedByTime(Observable<String> nmea) {
53          return sortByTime(Streams.extract(nmea))
54          // aggregate ship data with the message
55                  .scan(new AisMessageAndVesselData(), AisMessageAndVesselData.aggregate)
56                  // positions only, with lat long present
57                  .filter(isPosition)
58                  // convert to vessel positions
59                  .map(toVesselPosition);
60      }
61  
62      private static final Func1<AisMessageAndVesselData, Boolean> isPosition = m -> {
63          if (m.message().isPresent()
64                  && m.message().get().getMessage().get().message() instanceof AisPosition) {
65              AisPosition p = (AisPosition) m.message().get().getMessage().get().message();
66              return (p.getLatitude() != null && p.getLongitude() != null);
67          } else
68              return false;
69      };
70  
71      private static final Func1<AisMessageAndVesselData, VesselPosition> toVesselPosition = messageAndData -> {
72  
73          AisPosition p = (AisPosition) messageAndData.message().get().getMessage().get().message();
74  
75          VesselClass cls;
76          if (p instanceof AisPositionA)
77              cls = VesselClass.A;
78          else
79              cls = VesselClass.B;
80          Mmsi id = new Mmsi(p.getMmsi());
81          Optional<Vessel> vessel = messageAndData.data().get(id);
82          Optional<Integer> lengthMetres = vessel.isPresent() ? vessel.get().getLengthMetres()
83                  : Optional.<Integer> absent();
84  
85          Optional<Integer> widthMetres = vessel.isPresent() ? vessel.get().getWidthMetres()
86                  : Optional.<Integer> absent();
87  
88          Optional<Double> speedMetresPerSecond = p.getSpeedOverGroundKnots() != null ? of(p
89                  .getSpeedOverGroundKnots() * 0.5144444444) : Optional.<Double> absent();
90  
91          Optional<Integer> shipType = vessel.isPresent() ? vessel.get().getShipType() : Optional
92                  .<Integer> absent();
93  
94          NavigationalStatus navigationalStatus;
95          if (p instanceof AisPositionA) {
96              AisPositionA a = (AisPositionA) p;
97              if (Util.equals(a.getNavigationalStatus(),
98                      au.gov.amsa.ais.message.NavigationalStatus.AT_ANCHOR))
99                  navigationalStatus = NavigationalStatus.AT_ANCHOR;
100             else if (Util.equals(a.getNavigationalStatus(),
101                     au.gov.amsa.ais.message.NavigationalStatus.MOORED)) {
102                 navigationalStatus = NavigationalStatus.MOORED;
103             } else
104                 navigationalStatus = NavigationalStatus.NOT_DEFINED;
105         } else
106             navigationalStatus = NavigationalStatus.NOT_DEFINED;
107 
108         Optional<String> positionAisNmea;
109         if (p instanceof AisPositionA) {
110             positionAisNmea = Optional.of(messageAndData.message().get().getLine());
111         } else
112             positionAisNmea = Optional.absent();
113 
114         Optional<String> shipStaticAisNmea;
115         if (vessel.isPresent())
116             shipStaticAisNmea = vessel.get().getNmea();
117         else
118             shipStaticAisNmea = Optional.absent();
119 
120         // TODO adjust lat, lon for position of ais set on ship
121         // given by A,B,C,D? Or instead store the position offset in
122         // metres in VesselPosition (preferred because RateOfTurn
123         // (ROT) may enter the picture later).
124         return VesselPosition.builder()
125         // cog
126                 .cogDegrees(fromNullable(p.getCourseOverGround()))
127                 // heading
128                 .headingDegrees(fromNullable(toDouble(p.getTrueHeading())))
129                 // speed
130                 .speedMetresPerSecond(speedMetresPerSecond)
131                 // lat
132                 .lat(p.getLatitude())
133                 // lon
134                 .lon(p.getLongitude())
135                 // id
136                 .id(id)
137                 // length
138                 .lengthMetres(lengthMetres)
139                 // width
140                 .widthMetres(widthMetres)
141                 // time
142                 .time(messageAndData.message().get().getMessage().get().time())
143                 // ship type
144                 .shipType(shipType)
145                 // class
146                 .cls(cls)
147                 // at anchor
148                 .navigationalStatus(navigationalStatus)
149                 // position nmea
150                 .positionAisNmea(positionAisNmea)
151                 // ship static nmea
152                 .shipStaticAisNmea(shipStaticAisNmea)
153                 // build it
154                 .build();
155     };
156 
157     private static Double toDouble(Number i) {
158         if (i == null)
159             return null;
160         else
161             return i.doubleValue();
162     }
163 
164 }