View Javadoc
1   package au.gov.amsa.animator;
2   
3   import static au.gov.amsa.geo.distance.EffectiveSpeedChecker.effectiveSpeedOk;
4   
5   import java.io.File;
6   import java.util.Collection;
7   import java.util.Date;
8   import java.util.List;
9   import java.util.Map;
10  import java.util.Queue;
11  import java.util.concurrent.ConcurrentHashMap;
12  import java.util.concurrent.ConcurrentLinkedQueue;
13  import java.util.concurrent.TimeUnit;
14  
15  import com.github.davidmoten.util.MapWithIndex;
16  
17  import au.gov.amsa.ais.rx.Streams;
18  import au.gov.amsa.geo.distance.EffectiveSpeedChecker;
19  import au.gov.amsa.geo.model.SegmentOptions;
20  import au.gov.amsa.risky.format.Fix;
21  import au.gov.amsa.risky.format.FixImpl;
22  import rx.Observable;
23  import rx.Subscriber;
24  import rx.functions.Func1;
25  import rx.internal.util.UtilityFunctions;
26  import rx.schedulers.Schedulers;
27  
28  public class ModelManyCraft implements Model {
29  
30      private final FixesSubscriber subscriber;
31      private final int fixesPerModelStep;
32      private volatile long stepNumber;
33  
34      public ModelManyCraft(Observable<Fix> fixes, int fixesPerModelStep) {
35          this.fixesPerModelStep = fixesPerModelStep;
36          this.subscriber = new FixesSubscriber();
37          Observable<Fix> source = fixes
38                  // cache for repeat
39                  .cache()
40                  //
41                  .doOnCompleted(() -> subscriber.reset())
42                  // repeat stream
43                  .repeat()
44                  // log
45                  .doOnCompleted(() -> {
46                      System.out.println("completed");
47                  });
48          source.subscribeOn(Schedulers.io()).subscribe(subscriber);
49      }
50  
51      @Override
52      public void updateModel(long stepNumber) {
53          this.stepNumber = stepNumber;
54          subscriber.requestMore(fixesPerModelStep);
55      }
56  
57      @SuppressWarnings("unchecked")
58      @Override
59      public Map<Integer, Collection<Fix>> recent() {
60          return (Map<Integer, Collection<Fix>>) ((Map<Integer, ?>) subscriber.queues);
61      }
62  
63      @Override
64      public long stepNumber() {
65          return stepNumber;
66      }
67  
68      private static class FixesSubscriber extends Subscriber<Fix> {
69  
70          private final ConcurrentHashMap<Integer, Queue<Fix>> queues = new ConcurrentHashMap<>();
71          private final ConcurrentHashMap<Integer, Fix> lastFix = new ConcurrentHashMap<>();
72          private final int maxSize = 10;
73          private final SegmentOptions options = SegmentOptions.builder().build();
74  
75          synchronized void reset() {
76              queues.clear();
77              lastFix.clear();
78          }
79  
80          @Override
81          public void onStart() {
82              request(0);
83          }
84  
85          public void requestMore(long n) {
86              request(n);
87          }
88  
89          @Override
90          public void onCompleted() {
91              System.out.println("finished");
92          }
93  
94          @Override
95          public void onError(Throwable e) {
96              e.printStackTrace();
97          }
98  
99          @Override
100         public void onNext(Fix f) {
101             Queue<Fix> queue = queues.computeIfAbsent(f.mmsi(),
102                     mmsi -> new ConcurrentLinkedQueue<Fix>());
103             if (queue.size() == maxSize)
104                 queue.poll();
105             Fix last = lastFix.get(f.mmsi());
106             if (last == null || f.time() >= last.time() + 600000 && effectiveSpeedOk(last.time(),
107                     last.lat(), last.lon(), f.time(), f.lat(), f.lon(), options)) {
108                 queue.add(f);
109                 lastFix.put(f.mmsi(), f);
110             }
111         }
112     }
113 
114     private static Func1<List<Fix>, Fix> extrapolateToNext(long startTime, long intervalMs) {
115         return list -> {
116             if (list.size() == 0)
117                 throw new RuntimeException("unexpected");
118             else {
119                 Fix a = list.get(0);
120 
121                 if (list.size() == 1) {
122                     Fix b = a;
123                     long t = nextIntervalStartTime(startTime, intervalMs, a);
124                     return new FixImpl(b.mmsi(), b.lat(), b.lon(), t, b.latencySeconds(),
125                             b.source(), b.navigationalStatus(), b.speedOverGroundKnots(),
126                             b.courseOverGroundDegrees(), b.headingDegrees(), b.aisClass());
127                 } else {
128                     Fix b = list.get(1);
129                     long t = nextIntervalStartTime(startTime, intervalMs, b);
130                     if (EffectiveSpeedChecker.effectiveSpeedOk(a.time(), a.lat(), a.lon(), b.time(),
131                             b.lat(), b.lon(), SegmentOptions.getDefault())) {
132                         FixImpl c = new FixImpl(b.mmsi(), b.lat(), b.lon(), t, b.latencySeconds(),
133                                 b.source(), b.navigationalStatus(), b.speedOverGroundKnots(),
134                                 b.courseOverGroundDegrees(), b.headingDegrees(), b.aisClass());
135                         return c;
136                     } else
137                         return new FixImpl(b.mmsi(), b.lat(), b.lon(), t, b.latencySeconds(),
138                                 b.source(), b.navigationalStatus(), b.speedOverGroundKnots(),
139                                 b.courseOverGroundDegrees(), b.headingDegrees(), b.aisClass());
140                 }
141             }
142         };
143     }
144 
145     private static long nextIntervalStartTime(long startTime, long intervalMs, Fix a) {
146         return ((a.time() - startTime) / intervalMs + 1) * intervalMs + startTime;
147     }
148 
149     public static void main(String[] args) {
150         Observable.range(1, 10).groupBy(n -> n % 2).flatMap(g -> g.map(t -> g.getKey() + ":" + t))
151                 .subscribe(System.out::println);
152         // System.exit(0);
153 
154         File file = new File("/media/an/nmea/2014/NMEA_ITU_20140201.gz");
155         Observable<Fix> source = Streams.extractFixes(Streams.nmeaFromGzip(file));
156         final long startTime = 1391212800000L;
157         System.out.println(new Date(startTime));
158 
159         final long intervalMs = TimeUnit.MINUTES.toMillis(5);
160 
161         source.buffer(1000000).compose(MapWithIndex.<List<Fix>> instance()).take(1)
162                 //
163                 .concatMap(buffer -> Observable.from(buffer.value())
164                         // sort by time
165                         .toSortedList((a, b) -> (((Long) a.time()).compareTo(b.time())))
166                         // flatten
167                         .concatMap(x -> Observable.from(x))
168                         // group by timestep
169                         .groupBy(fix -> (fix.time() - startTime) / intervalMs)
170                         //
171                         .flatMap(
172                                 // within each timestep group by mmsi
173                                 g -> g.groupBy(fix -> fix.mmsi())
174                                         //
175                                         .flatMap(g2 -> // take the last
176                                                        // two at the end
177                                                        // of the timestep
178         g2.takeLast(2)
179                 // convert to a
180                 // list of 1 or
181                 // 2 items
182                 .toList()
183                 // predict
184                 // position at
185                 // end of
186                 // timestep
187                 .map(extrapolateToNext(startTime, intervalMs))))).cast(Fix.class)
188                 // sort by time
189                 .toSortedList((a, b) -> (((Long) a.time()).compareTo(b.time())))
190                 // flatten
191                 .flatMapIterable(UtilityFunctions.identity())
192                 // to string
193                 .map(fix -> new Date(fix.time()) + " " + fix)
194                 // go
195                 .subscribe(System.out::println);
196     }
197 }