1 package au.gov.amsa.navigation;
2
3 import static com.github.davidmoten.rtree.geometry.Geometries.rectangle;
4 import static java.lang.Math.cos;
5 import static java.lang.Math.toRadians;
6 import static rx.Observable.empty;
7 import static rx.Observable.from;
8 import static rx.Observable.just;
9
10 import java.util.List;
11 import java.util.TreeSet;
12 import java.util.concurrent.TimeUnit;
13
14 import com.github.davidmoten.rtree.Entry;
15 import com.github.davidmoten.rtree.geometry.Point;
16 import com.github.davidmoten.rtree.geometry.Rectangle;
17 import com.github.davidmoten.rx.slf4j.Logging;
18 import com.google.common.base.Optional;
19
20 import rx.Observable;
21 import rx.Observable.Transformer;
22 import rx.functions.Func1;
23 import rx.functions.Func2;
24 import rx.observables.GroupedObservable;
25
26 public class CollisionDetector {
27
28 private static final long MAX_TIME_INTERVAL_MS = TimeUnit.MINUTES.toMillis(5);
29 private static final double MAX_VESSEL_SPEED_METRES_PER_SECOND = 24;
30
31 private static final double LATITUDE_DELTA = 2 * MAX_TIME_INTERVAL_MS / 1000
32 * MAX_VESSEL_SPEED_METRES_PER_SECOND / (60 * 1852);
33
34
35
36
37 public Observable<CollisionCandidate> getCandidates(Observable<VesselPosition> o) {
38
39 return getCandidatesForAStream(o);
40
41
42
43
44
45
46
47 }
48
49 public static Transformer<VesselPosition, CollisionCandidate> detectCollisionCandidates() {
50 return o -> new CollisionDetector().getCandidates(o);
51 }
52
53 private static Func1<VesselPosition, Region> toRegion() {
54 return new Func1<VesselPosition, Region>() {
55
56 @Override
57 public Region call(VesselPosition p) {
58 double maxLat = 15;
59 double minLat = -50;
60 double minLon = -70;
61 double maxLon = 179;
62 int numRegions = Runtime.getRuntime().availableProcessors();
63 int x = (int) Math.floor((p.lon() - minLon) / (maxLon - minLon) * numRegions);
64 double lonCellSize = (maxLon - minLon) / numRegions;
65 double longitudeDelta;
66 if (Math.abs(minLat) > Math.abs(maxLat))
67 longitudeDelta = longitudeDelta(minLat);
68 else
69 longitudeDelta = longitudeDelta(maxLat);
70 return new Region(maxLat, minLon + x * lonCellSize - longitudeDelta, minLat,
71 minLon + (x + 1) * lonCellSize + longitudeDelta);
72 }
73 };
74 }
75
76 public static Observable<CollisionCandidate> getCandidatesForAStream(
77 Observable<VesselPosition> o) {
78
79
80 return Observable.defer(() -> o.scan(new State(), nextState())
81
82 .lift(Logging.<State> logger().showCount("positions")
83 .showRateSince("rate (pos/s)", TimeUnit.SECONDS.toMillis(10))
84 .showRateSinceStart("overall rate").every(10000).showValue()
85 .value(state -> "state.map.size=" + state.mapSize() + ", state.rtree.size="
86 + state.tree().size())
87 .log())
88
89
90 .flatMap(toCollisionCandidatesForPosition())
91
92 .groupBy(byIdPair())
93
94 .flatMap(onlyRepeating()));
95 }
96
97 private static Func2<State, VesselPosition, State> nextState() {
98 return (state, p) -> state.nextState(MAX_TIME_INTERVAL_MS, p);
99 }
100
101 private static Func1<State, Observable<CollisionCandidate>> toCollisionCandidatesForPosition() {
102 return state -> {
103 if (!state.last().isPresent())
104 return Observable.empty();
105 else {
106 return toCollisionCandidatesForPosition(state);
107 }
108 };
109 }
110
111 private static Observable<CollisionCandidate> toCollisionCandidatesForPosition(State state) {
112 final VesselPosition p = state.last().get();
113 final Optional<VesselPosition> next = state.nextPosition();
114
115
116
117
118
119
120 double longitudeDelta = longitudeDelta(p.lat());
121 Rectangle searchRegion = rectangle(p.lon() - longitudeDelta, p.lat() - LATITUDE_DELTA,
122 p.lon() + longitudeDelta, p.lat() + LATITUDE_DELTA);
123
124
125 Observable<VesselPosition> near = state.tree()
126
127 .search(searchRegion)
128
129 .map(toVesselPosition)
130
131 .filter(aroundInTime(p, MAX_TIME_INTERVAL_MS));
132
133 final Observable<TreeSet<VesselPosition>> othersByVessel = near
134
135
136 .filter(not(isVessel(p.id())))
137
138 .groupBy(byId())
139
140 .flatMap(toSortedSet());
141
142 Observable<CollisionCandidate> collisionCandidates = othersByVessel
143 .flatMap(toCollisionCandidates2(p, next));
144
145 return collisionCandidates;
146 }
147
148 private static <T> Func1<T, Boolean> not(final Func1<T, Boolean> f) {
149 return t -> !f.call(t);
150 }
151
152 private static double longitudeDelta(double lat) {
153 return LATITUDE_DELTA / cos(toRadians(lat));
154 }
155
156 private static Func1<GroupedObservable<IdentifierPair, CollisionCandidate>, Observable<? extends CollisionCandidate>> onlyRepeating() {
157 return g -> g.buffer(2).flatMap(isSmallTimePeriod());
158 }
159
160 private static Func1<List<CollisionCandidate>, Observable<CollisionCandidate>> isSmallTimePeriod() {
161 return list -> {
162 Optional<Long> min = Optional.absent();
163 Optional<Long> max = Optional.absent();
164 for (CollisionCandidate c : list) {
165 if (!min.isPresent() || c.position1().time() < min.get())
166 min = Optional.of(c.position1().time());
167 if (!max.isPresent() || c.position1().time() > max.get())
168 max = Optional.of(c.position1().time());
169 }
170 if (max.get() - min.get() < TimeUnit.MINUTES.toMillis(5))
171 return from(list);
172 else
173 return empty();
174 };
175 }
176
177 private static Func1<? super CollisionCandidate, IdentifierPair> byIdPair() {
178 return c -> new IdentifierPair(c.position1().id(), c.position2().id());
179 }
180
181 private static Func1<TreeSet<VesselPosition>, Observable<CollisionCandidate>> toCollisionCandidates2(
182 final VesselPosition p, final Optional<VesselPosition> next) {
183 return set -> {
184 Optional<VesselPosition> other = Optional.fromNullable(set.lower(p));
185 if (other.isPresent()) {
186 Optional<Times> times = p.intersectionTimes(other.get());
187 if (times.isPresent()) {
188 Optional<Long> tCollision = plus(times.get().leastPositive(), p.time());
189 if (tCollision.isPresent()
190 && tCollision.get() < p.time() + MAX_TIME_INTERVAL_MS) {
191 Optional<VesselPosition> otherNext = Optional
192 .fromNullable(set.higher(other.get()));
193 if (otherNext.isPresent() && otherNext.get().time() < tCollision.get())
194 return empty();
195 else if (next.isPresent() && next.get().time() < tCollision.get())
196 return empty();
197 else
198 return just(new CollisionCandidate(p, other.get(), tCollision.get()));
199 } else
200 return empty();
201 } else
202 return empty();
203 } else
204 return empty();
205 };
206 }
207
208 private static Optional<Long> plus(Optional<Long> a, long b) {
209 if (a.isPresent())
210 return Optional.of(a.get() + b);
211 else
212 return Optional.absent();
213 }
214
215 private static Func1<GroupedObservable<Identifier, VesselPosition>, Observable<TreeSet<VesselPosition>>> toSortedSet() {
216 return g -> g.toList().map(singleVesselPositions -> {
217 TreeSet<VesselPosition> set = new TreeSet<VesselPosition>(
218 Comparators.timeIdMessageIdComparator);
219 set.addAll(singleVesselPositions);
220 return set;
221 });
222 }
223
224 private static Func1<VesselPosition, Identifier> byId() {
225 return position -> position.id();
226 }
227
228 private static Func1<VesselPosition, Boolean> aroundInTime(final VesselPosition position,
229 final long maxTimeIntervalMs) {
230 return p -> Math.abs(p.time() - position.time()) <= maxTimeIntervalMs;
231 }
232
233 private static Func1<VesselPosition, Boolean> isVessel(final Identifier id) {
234 return p -> p.id().equals(id);
235 }
236
237 private static Func1<Entry<VesselPosition, Point>, VesselPosition> toVesselPosition = entry -> entry
238 .value();
239
240 }