1 package au.gov.amsa.ais.rx.operators;
2
3 import rx.Observable.Operator;
4 import rx.Subscriber;
5 import au.gov.amsa.ais.AisMessage;
6 import au.gov.amsa.ais.HasMmsi;
7 import au.gov.amsa.ais.Timestamped;
8 import au.gov.amsa.ais.message.AisPositionBExtended;
9 import au.gov.amsa.ais.message.AisShipStaticA;
10 import au.gov.amsa.ais.rx.CraftProperty;
11 import au.gov.amsa.ais.rx.CraftPropertyName;
12 import au.gov.amsa.ais.rx.Mmsi;
13
14 public class OperatorCraftProperty implements
15 Operator<CraftProperty, Timestamped<? extends AisMessage>> {
16
17 @Override
18 public Subscriber<? super Timestamped<? extends AisMessage>> call(
19 final Subscriber<? super CraftProperty> child) {
20
21 return new Subscriber<Timestamped<? extends AisMessage>>(child) {
22
23 @Override
24 public void onCompleted() {
25 if (!isUnsubscribed())
26 child.onCompleted();
27 }
28
29 @Override
30 public void onError(Throwable e) {
31 if (!isUnsubscribed())
32 child.onError(e);
33 }
34
35 @SuppressWarnings("unchecked")
36 @Override
37 public void onNext(Timestamped<? extends AisMessage> m) {
38 if (m.message() instanceof AisShipStaticA) {
39 handleShipStatic((Timestamped<AisShipStaticA>) m, child);
40 } else if (m.message() instanceof AisPositionBExtended)
41 handleAisPositionBExtended(
42 (Timestamped<AisPositionBExtended>) m, child);
43 }
44
45 private void handleShipStatic(Timestamped<AisShipStaticA> m,
46 Subscriber<? super CraftProperty> child) {
47 handleProperty(child, m, CraftPropertyName.CALLSIGN, m
48 .message().getCallsign());
49 handleProperty(child, m, CraftPropertyName.DESTINATION, m
50 .message().getDestination());
51 handleProperty(child, m, CraftPropertyName.DIMENSION_A, m
52 .message().getDimensionA());
53 handleProperty(child, m, CraftPropertyName.DIMENSION_B, m
54 .message().getDimensionB());
55 handleProperty(child, m, CraftPropertyName.DIMENSION_C, m
56 .message().getDimensionC());
57 handleProperty(child, m, CraftPropertyName.DIMENSION_D, m
58 .message().getDimensionD());
59 handleProperty(child, m, CraftPropertyName.IMO_NUMBER, m
60 .message().getImo());
61 handleProperty(child, m, CraftPropertyName.LENGTH_METRES, m
62 .message().getLengthMetres());
63 handleProperty(child, m, CraftPropertyName.DRAUGHT_METRES, m
64 .message().getMaximumPresentStaticDraughtMetres());
65 handleProperty(child, m, CraftPropertyName.NAME, m.message()
66 .getName());
67 handleProperty(child, m, CraftPropertyName.SHIP_TYPE, m
68 .message().getShipType());
69 handleProperty(child, m, CraftPropertyName.WIDTH_METRES, m
70 .message().getWidthMetres());
71 }
72
73 private void handleAisPositionBExtended(
74 Timestamped<AisPositionBExtended> m,
75 Subscriber<? super CraftProperty> child) {
76 handleProperty(child, m, CraftPropertyName.DIMENSION_A, m
77 .message().getDimensionA());
78 handleProperty(child, m, CraftPropertyName.DIMENSION_B, m
79 .message().getDimensionB());
80 handleProperty(child, m, CraftPropertyName.DIMENSION_C, m
81 .message().getDimensionC());
82 handleProperty(child, m, CraftPropertyName.DIMENSION_D, m
83 .message().getDimensionD());
84 handleProperty(child, m, CraftPropertyName.LENGTH_METRES, m
85 .message().getLengthMetres());
86 handleProperty(child, m, CraftPropertyName.NAME, m.message()
87 .getName());
88 handleProperty(child, m, CraftPropertyName.SHIP_TYPE, m
89 .message().getShipType());
90 handleProperty(child, m, CraftPropertyName.WIDTH_METRES, m
91 .message().getWidthMetres());
92 }
93
94 private <R extends AisMessage & HasMmsi> void handleProperty(
95 Subscriber<? super CraftProperty> child,
96 Timestamped<R > m,
97 CraftPropertyName name, Object value) {
98 if (!isUnsubscribed() && value != null)
99 child.onNext(new CraftProperty(new Mmsi(m.message()
100 .getMmsi()), name, value.toString(), m.time()));
101 }
102 };
103 }
104 }