1 package au.gov.amsa.animator;
2
3 import static au.gov.amsa.geo.distance.EffectiveSpeedChecker.effectiveSpeedOk;
4
5 import java.io.File;
6 import java.util.Collection;
7 import java.util.Date;
8 import java.util.List;
9 import java.util.Map;
10 import java.util.Queue;
11 import java.util.concurrent.ConcurrentHashMap;
12 import java.util.concurrent.ConcurrentLinkedQueue;
13 import java.util.concurrent.TimeUnit;
14
15 import com.github.davidmoten.util.MapWithIndex;
16
17 import au.gov.amsa.ais.rx.Streams;
18 import au.gov.amsa.geo.distance.EffectiveSpeedChecker;
19 import au.gov.amsa.geo.model.SegmentOptions;
20 import au.gov.amsa.risky.format.Fix;
21 import au.gov.amsa.risky.format.FixImpl;
22 import rx.Observable;
23 import rx.Subscriber;
24 import rx.functions.Func1;
25 import rx.internal.util.UtilityFunctions;
26 import rx.schedulers.Schedulers;
27
28 public class ModelManyCraft implements Model {
29
30 private final FixesSubscriber subscriber;
31 private final int fixesPerModelStep;
32 private volatile long stepNumber;
33
34 public ModelManyCraft(Observable<Fix> fixes, int fixesPerModelStep) {
35 this.fixesPerModelStep = fixesPerModelStep;
36 this.subscriber = new FixesSubscriber();
37 Observable<Fix> source = fixes
38
39 .cache()
40
41 .doOnCompleted(() -> subscriber.reset())
42
43 .repeat()
44
45 .doOnCompleted(() -> {
46 System.out.println("completed");
47 });
48 source.subscribeOn(Schedulers.io()).subscribe(subscriber);
49 }
50
51 @Override
52 public void updateModel(long stepNumber) {
53 this.stepNumber = stepNumber;
54 subscriber.requestMore(fixesPerModelStep);
55 }
56
57 @SuppressWarnings("unchecked")
58 @Override
59 public Map<Integer, Collection<Fix>> recent() {
60 return (Map<Integer, Collection<Fix>>) ((Map<Integer, ?>) subscriber.queues);
61 }
62
63 @Override
64 public long stepNumber() {
65 return stepNumber;
66 }
67
68 private static class FixesSubscriber extends Subscriber<Fix> {
69
70 private final ConcurrentHashMap<Integer, Queue<Fix>> queues = new ConcurrentHashMap<>();
71 private final ConcurrentHashMap<Integer, Fix> lastFix = new ConcurrentHashMap<>();
72 private final int maxSize = 10;
73 private final SegmentOptions options = SegmentOptions.builder().build();
74
75 synchronized void reset() {
76 queues.clear();
77 lastFix.clear();
78 }
79
80 @Override
81 public void onStart() {
82 request(0);
83 }
84
85 public void requestMore(long n) {
86 request(n);
87 }
88
89 @Override
90 public void onCompleted() {
91 System.out.println("finished");
92 }
93
94 @Override
95 public void onError(Throwable e) {
96 e.printStackTrace();
97 }
98
99 @Override
100 public void onNext(Fix f) {
101 Queue<Fix> queue = queues.computeIfAbsent(f.mmsi(),
102 mmsi -> new ConcurrentLinkedQueue<Fix>());
103 if (queue.size() == maxSize)
104 queue.poll();
105 Fix last = lastFix.get(f.mmsi());
106 if (last == null || f.time() >= last.time() + 600000 && effectiveSpeedOk(last.time(),
107 last.lat(), last.lon(), f.time(), f.lat(), f.lon(), options)) {
108 queue.add(f);
109 lastFix.put(f.mmsi(), f);
110 }
111 }
112 }
113
114 private static Func1<List<Fix>, Fix> extrapolateToNext(long startTime, long intervalMs) {
115 return list -> {
116 if (list.size() == 0)
117 throw new RuntimeException("unexpected");
118 else {
119 Fix a = list.get(0);
120
121 if (list.size() == 1) {
122 Fix b = a;
123 long t = nextIntervalStartTime(startTime, intervalMs, a);
124 return new FixImpl(b.mmsi(), b.lat(), b.lon(), t, b.latencySeconds(),
125 b.source(), b.navigationalStatus(), b.speedOverGroundKnots(),
126 b.courseOverGroundDegrees(), b.headingDegrees(), b.aisClass());
127 } else {
128 Fix b = list.get(1);
129 long t = nextIntervalStartTime(startTime, intervalMs, b);
130 if (EffectiveSpeedChecker.effectiveSpeedOk(a.time(), a.lat(), a.lon(), b.time(),
131 b.lat(), b.lon(), SegmentOptions.getDefault())) {
132 FixImpl c = new FixImpl(b.mmsi(), b.lat(), b.lon(), t, b.latencySeconds(),
133 b.source(), b.navigationalStatus(), b.speedOverGroundKnots(),
134 b.courseOverGroundDegrees(), b.headingDegrees(), b.aisClass());
135 return c;
136 } else
137 return new FixImpl(b.mmsi(), b.lat(), b.lon(), t, b.latencySeconds(),
138 b.source(), b.navigationalStatus(), b.speedOverGroundKnots(),
139 b.courseOverGroundDegrees(), b.headingDegrees(), b.aisClass());
140 }
141 }
142 };
143 }
144
145 private static long nextIntervalStartTime(long startTime, long intervalMs, Fix a) {
146 return ((a.time() - startTime) / intervalMs + 1) * intervalMs + startTime;
147 }
148
149 public static void main(String[] args) {
150 Observable.range(1, 10).groupBy(n -> n % 2).flatMap(g -> g.map(t -> g.getKey() + ":" + t))
151 .subscribe(System.out::println);
152
153
154 File file = new File("/media/an/nmea/2014/NMEA_ITU_20140201.gz");
155 Observable<Fix> source = Streams.extractFixes(Streams.nmeaFromGzip(file));
156 final long startTime = 1391212800000L;
157 System.out.println(new Date(startTime));
158
159 final long intervalMs = TimeUnit.MINUTES.toMillis(5);
160
161 source.buffer(1000000).compose(MapWithIndex.<List<Fix>> instance()).take(1)
162
163 .concatMap(buffer -> Observable.from(buffer.value())
164
165 .toSortedList((a, b) -> (((Long) a.time()).compareTo(b.time())))
166
167 .concatMap(x -> Observable.from(x))
168
169 .groupBy(fix -> (fix.time() - startTime) / intervalMs)
170
171 .flatMap(
172
173 g -> g.groupBy(fix -> fix.mmsi())
174
175 .flatMap(g2 ->
176
177
178 g2.takeLast(2)
179
180
181
182 .toList()
183
184
185
186
187 .map(extrapolateToNext(startTime, intervalMs))))).cast(Fix.class)
188
189 .toSortedList((a, b) -> (((Long) a.time()).compareTo(b.time())))
190
191 .flatMapIterable(UtilityFunctions.identity())
192
193 .map(fix -> new Date(fix.time()) + " " + fix)
194
195 .subscribe(System.out::println);
196 }
197 }