View Javadoc
1   package au.gov.amsa.navigation;
2   
3   import static com.github.davidmoten.rtree.geometry.Geometries.rectangle;
4   import static java.lang.Math.cos;
5   import static java.lang.Math.toRadians;
6   import static rx.Observable.empty;
7   import static rx.Observable.from;
8   import static rx.Observable.just;
9   
10  import java.util.List;
11  import java.util.TreeSet;
12  import java.util.concurrent.TimeUnit;
13  
14  import com.github.davidmoten.rtree.Entry;
15  import com.github.davidmoten.rtree.geometry.Point;
16  import com.github.davidmoten.rtree.geometry.Rectangle;
17  import com.github.davidmoten.rx.slf4j.Logging;
18  import com.google.common.base.Optional;
19  
20  import rx.Observable;
21  import rx.Observable.Transformer;
22  import rx.functions.Func1;
23  import rx.functions.Func2;
24  import rx.observables.GroupedObservable;
25  
26  public class CollisionDetector {
27  
28      private static final long MAX_TIME_INTERVAL_MS = TimeUnit.MINUTES.toMillis(5);
29      private static final double MAX_VESSEL_SPEED_METRES_PER_SECOND = 24;
30  
31      private static final double LATITUDE_DELTA = 2 * MAX_TIME_INTERVAL_MS / 1000
32              * MAX_VESSEL_SPEED_METRES_PER_SECOND / (60 * 1852);
33  
34      // TODO use this?
35      // private static final long STEP_MS = TimeUnit.SECONDS.toMillis(1);
36  
37      public Observable<CollisionCandidate> getCandidates(Observable<VesselPosition> o) {
38  
39          return getCandidatesForAStream(o);
40  
41          // TODOs
42          // split the stream into multiple streams based on slightly overlapping
43          // geographic region (overlap is LATITUDE_DELTA and longitudeDelta(lat)
44          // in size) to enable concurrency
45          // .groupBy(toRegion()).flatMap(getCandidates());
46  
47      }
48  
49      public static Transformer<VesselPosition, CollisionCandidate> detectCollisionCandidates() {
50          return o -> new CollisionDetector().getCandidates(o);
51      }
52  
53      private static Func1<VesselPosition, Region> toRegion() {
54          return new Func1<VesselPosition, Region>() {
55  
56              @Override
57              public Region call(VesselPosition p) {
58                  double maxLat = 15;
59                  double minLat = -50;
60                  double minLon = -70;
61                  double maxLon = 179;
62                  int numRegions = Runtime.getRuntime().availableProcessors();
63                  int x = (int) Math.floor((p.lon() - minLon) / (maxLon - minLon) * numRegions);
64                  double lonCellSize = (maxLon - minLon) / numRegions;
65                  double longitudeDelta;
66                  if (Math.abs(minLat) > Math.abs(maxLat))
67                      longitudeDelta = longitudeDelta(minLat);
68                  else
69                      longitudeDelta = longitudeDelta(maxLat);
70                  return new Region(maxLat, minLon + x * lonCellSize - longitudeDelta, minLat,
71                          minLon + (x + 1) * lonCellSize + longitudeDelta);
72              }
73          };
74      }
75  
76      public static Observable<CollisionCandidate> getCandidatesForAStream(
77              Observable<VesselPosition> o) {
78          // make a window of recent positions indexed spatially
79  
80          return Observable.defer(() -> o.scan(new State(), nextState())
81                  // log
82                  .lift(Logging.<State> logger().showCount("positions")
83                          .showRateSince("rate (pos/s)", TimeUnit.SECONDS.toMillis(10))
84                          .showRateSinceStart("overall rate").every(10000).showValue()
85                          .value(state -> "state.map.size=" + state.mapSize() + ", state.rtree.size="
86                                  + state.tree().size())
87                          .log())
88                  // report collision candidates from each window for the latest
89                  // reported position
90                  .flatMap(toCollisionCandidatesForPosition())
91                  // group by id of first candidate
92                  .groupBy(byIdPair())
93                  // only show if repeated
94                  .flatMap(onlyRepeating()));
95      }
96  
97      private static Func2<State, VesselPosition, State> nextState() {
98          return (state, p) -> state.nextState(MAX_TIME_INTERVAL_MS, p);
99      }
100 
101     private static Func1<State, Observable<CollisionCandidate>> toCollisionCandidatesForPosition() {
102         return state -> {
103             if (!state.last().isPresent())
104                 return Observable.empty();
105             else {
106                 return toCollisionCandidatesForPosition(state);
107             }
108         };
109     }
110 
111     private static Observable<CollisionCandidate> toCollisionCandidatesForPosition(State state) {
112         final VesselPosition p = state.last().get();
113         final Optional<VesselPosition> next = state.nextPosition();
114 
115         // use the spatial index to get positions physically near the latest
116         // position report
117 
118         // setup a region around the latest position report to search with a
119         // decent delta).
120         double longitudeDelta = longitudeDelta(p.lat());
121         Rectangle searchRegion = rectangle(p.lon() - longitudeDelta, p.lat() - LATITUDE_DELTA,
122                 p.lon() + longitudeDelta, p.lat() + LATITUDE_DELTA);
123 
124         // find nearby vessels within time constraints and cache them
125         Observable<VesselPosition> near = state.tree()
126                 // search the R-tree
127                 .search(searchRegion)
128                 // get just the vessel position
129                 .map(toVesselPosition)
130                 // only accept positions with time close to p
131                 .filter(aroundInTime(p, MAX_TIME_INTERVAL_MS));
132 
133         final Observable<TreeSet<VesselPosition>> othersByVessel = near
134                 // only those vessels with different id as latest position
135                 // report
136                 .filter(not(isVessel(p.id())))
137                 // group by individual vessel
138                 .groupBy(byId())
139                 // sort the positions by time
140                 .flatMap(toSortedSet());
141 
142         Observable<CollisionCandidate> collisionCandidates = othersByVessel
143                 .flatMap(toCollisionCandidates2(p, next));
144 
145         return collisionCandidates;
146     }
147 
148     private static <T> Func1<T, Boolean> not(final Func1<T, Boolean> f) {
149         return t -> !f.call(t);
150     }
151 
152     private static double longitudeDelta(double lat) {
153         return LATITUDE_DELTA / cos(toRadians(lat));
154     }
155 
156     private static Func1<GroupedObservable<IdentifierPair, CollisionCandidate>, Observable<? extends CollisionCandidate>> onlyRepeating() {
157         return g -> g.buffer(2).flatMap(isSmallTimePeriod());
158     }
159 
160     private static Func1<List<CollisionCandidate>, Observable<CollisionCandidate>> isSmallTimePeriod() {
161         return list -> {
162             Optional<Long> min = Optional.absent();
163             Optional<Long> max = Optional.absent();
164             for (CollisionCandidate c : list) {
165                 if (!min.isPresent() || c.position1().time() < min.get())
166                     min = Optional.of(c.position1().time());
167                 if (!max.isPresent() || c.position1().time() > max.get())
168                     max = Optional.of(c.position1().time());
169             }
170             if (max.get() - min.get() < TimeUnit.MINUTES.toMillis(5))
171                 return from(list);
172             else
173                 return empty();
174         };
175     }
176 
177     private static Func1<? super CollisionCandidate, IdentifierPair> byIdPair() {
178         return c -> new IdentifierPair(c.position1().id(), c.position2().id());
179     }
180 
181     private static Func1<TreeSet<VesselPosition>, Observable<CollisionCandidate>> toCollisionCandidates2(
182             final VesselPosition p, final Optional<VesselPosition> next) {
183         return set -> {
184             Optional<VesselPosition> other = Optional.fromNullable(set.lower(p));
185             if (other.isPresent()) {
186                 Optional<Times> times = p.intersectionTimes(other.get());
187                 if (times.isPresent()) {
188                     Optional<Long> tCollision = plus(times.get().leastPositive(), p.time());
189                     if (tCollision.isPresent()
190                             && tCollision.get() < p.time() + MAX_TIME_INTERVAL_MS) {
191                         Optional<VesselPosition> otherNext = Optional
192                                 .fromNullable(set.higher(other.get()));
193                         if (otherNext.isPresent() && otherNext.get().time() < tCollision.get())
194                             return empty();
195                         else if (next.isPresent() && next.get().time() < tCollision.get())
196                             return empty();
197                         else
198                             return just(new CollisionCandidate(p, other.get(), tCollision.get()));
199                     } else
200                         return empty();
201                 } else
202                     return empty();
203             } else
204                 return empty();
205         };
206     }
207 
208     private static Optional<Long> plus(Optional<Long> a, long b) {
209         if (a.isPresent())
210             return Optional.of(a.get() + b);
211         else
212             return Optional.absent();
213     }
214 
215     private static Func1<GroupedObservable<Identifier, VesselPosition>, Observable<TreeSet<VesselPosition>>> toSortedSet() {
216         return g -> g.toList().map(singleVesselPositions -> {
217             TreeSet<VesselPosition> set = new TreeSet<VesselPosition>(
218                     Comparators.timeIdMessageIdComparator);
219             set.addAll(singleVesselPositions);
220             return set;
221         });
222     }
223 
224     private static Func1<VesselPosition, Identifier> byId() {
225         return position -> position.id();
226     }
227 
228     private static Func1<VesselPosition, Boolean> aroundInTime(final VesselPosition position,
229             final long maxTimeIntervalMs) {
230         return p -> Math.abs(p.time() - position.time()) <= maxTimeIntervalMs;
231     }
232 
233     private static Func1<VesselPosition, Boolean> isVessel(final Identifier id) {
234         return p -> p.id().equals(id);
235     }
236 
237     private static Func1<Entry<VesselPosition, Point>, VesselPosition> toVesselPosition = entry -> entry
238             .value();
239 
240 }