1 package au.gov.amsa.geo.distance;
2
3 import java.io.File;
4 import java.io.FileNotFoundException;
5 import java.io.IOException;
6 import java.io.PrintWriter;
7 import java.util.ArrayList;
8 import java.util.Comparator;
9 import java.util.HashMap;
10 import java.util.List;
11 import java.util.Map;
12 import java.util.Map.Entry;
13 import java.util.concurrent.atomic.AtomicLong;
14
15 import org.apache.log4j.Logger;
16
17 import com.github.davidmoten.rx.slf4j.Logging;
18 import com.google.common.annotations.VisibleForTesting;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Preconditions;
21
22 import au.gov.amsa.geo.model.Bounds;
23 import au.gov.amsa.geo.model.Cell;
24 import au.gov.amsa.geo.model.CellValue;
25 import au.gov.amsa.geo.model.GridTraversor;
26 import au.gov.amsa.geo.model.Options;
27 import au.gov.amsa.geo.model.SegmentOptions;
28 import au.gov.amsa.geo.model.Util;
29 import au.gov.amsa.risky.format.BinaryFixes;
30 import au.gov.amsa.risky.format.Fix;
31 import au.gov.amsa.risky.format.HasPosition;
32 import au.gov.amsa.util.navigation.Position;
33 import rx.Observable;
34 import rx.Observable.OnSubscribe;
35 import rx.Observer;
36 import rx.Subscriber;
37 import rx.functions.Action1;
38 import rx.functions.Action2;
39 import rx.functions.Func0;
40 import rx.functions.Func1;
41 import rx.schedulers.Schedulers;
42 import ucar.ma2.Array;
43 import ucar.ma2.ArrayDouble;
44 import ucar.ma2.DataType;
45 import ucar.ma2.Index2D;
46 import ucar.ma2.InvalidRangeException;
47 import ucar.nc2.Attribute;
48 import ucar.nc2.Dimension;
49 import ucar.nc2.NetcdfFileWriter;
50 import ucar.nc2.Variable;
51
52 public class DistanceTravelledCalculator {
53
54 private static Logger log = Logger.getLogger(DistanceTravelledCalculator.class);
55
56 private final Options options;
57 private final DistanceCalculationMetrics metrics;
58
59 public DistanceTravelledCalculator(Options options, DistanceCalculationMetrics metrics) {
60 this.options = options;
61 this.metrics = metrics;
62 }
63
64
65
66
67
68
69
70
71
72
73
74 public Observable<CellAndDistance> calculateDistanceByCellFromFiles(Observable<File> files) {
75
76
77
78
79 int numFiles = files.count().toBlocking().single();
80 log.info("numFiles=" + numFiles);
81 AtomicLong fileCount = new AtomicLong();
82 AtomicLong cellCount = new AtomicLong(1);
83 return files
84
85 .buffer(Math.max(1,
86 (int) Math.round(
87 Math.ceil(numFiles / Runtime.getRuntime().availableProcessors()))))
88 .flatMap(fileList -> extractCellDistances(fileCount, cellCount, fileList))
89
90 .collect(bigMapFactory(), collectCellDistances())
91
92 .flatMap(listCellDistances())
93
94 .doOnNext(sumNauticalMiles());
95 }
96
97 private Func1<? super HashMap<Cell, Double>, Observable<CellAndDistance>> listCellDistances() {
98 return map -> Observable.from(map.entrySet())
99 .map(entry -> new CellAndDistance(entry.getKey(), entry.getValue()));
100 }
101
102 private Func0<HashMap<Cell, Double>> bigMapFactory() {
103 return () -> new HashMap<Cell, Double>(20_000_000, 1.0f);
104 }
105
106 private Action2<HashMap<Cell, Double>, Map<Cell, Double>> collectCellDistances() {
107 return (a, b) -> {
108
109 long t = System.currentTimeMillis();
110 log.info("reducing");
111 for (Entry<Cell, Double> entry : b.entrySet()) {
112 Double val = a.putIfAbsent(entry.getKey(), entry.getValue());
113 if (val != null) {
114 a.put(entry.getKey(), val + entry.getValue());
115 }
116 }
117 log.info("reduced in " + (System.currentTimeMillis() - t) + "ms");
118 };
119 }
120
121 private Observable<Map<Cell, Double>> extractCellDistances(AtomicLong fileCount,
122 AtomicLong cellCount, List<File> fileList) {
123 return
124 Observable.from(fileList)
125 .lift(Logging.<File> logger().showCount(fileCount).every(1000).log())
126 .map(file -> BinaryFixes.from(file))
127
128
129
130
131
132 .flatMap(toCraftCellAndDistances)
133
134 .lift(Logging.<CellAndDistance> logger().showCount("cellsReceived", cellCount)
135 .every(1_000_000).showMemory().log())
136
137
138 .lift(OperatorSumCellDistances.create(1_000_000))
139 .subscribeOn(Schedulers.computation());
140 }
141
142 private final Func1<Observable<Fix>, Observable<CellAndDistance>> toCraftCellAndDistances = new Func1<Observable<Fix>, Observable<CellAndDistance>>() {
143
144 @Override
145 public Observable<CellAndDistance> call(Observable<Fix> allFixesForASingleCraft) {
146
147 return allFixesForASingleCraft
148
149 .doOnNext(incrementFixesCount)
150
151 .filter(inTimeRange)
152
153 .filter(inRegion)
154
155
156
157
158
159 .lift(filterOnEffectiveSpeedOk())
160
161 .filter(check -> check.isOk())
162
163 .map(check -> check.fix())
164
165 .doOnNext(countFixesPassedEffectiveSpeedCheck)
166
167 .buffer(2, 1)
168
169 .filter(PAIRS_ONLY)
170
171 .doOnNext(segment -> metrics.segments.incrementAndGet())
172
173 .filter(timeDifferenceOk)
174
175 .filter(distanceOk)
176
177 .flatMap(toCellAndDistance)
178
179 .doOnNext(countSegmentCells)
180
181 .onBackpressureBuffer();
182 }
183
184 };
185
186 private OperatorEffectiveSpeedChecker filterOnEffectiveSpeedOk() {
187 return new OperatorEffectiveSpeedChecker(options.getSegmentOptions());
188 }
189
190 private final Func1<Fix, Boolean> inRegion = new Func1<Fix, Boolean>() {
191 @Override
192 public Boolean call(Fix fix) {
193 boolean in = options.getFilterBounds().contains(fix);
194 if (in)
195 metrics.fixesWithinRegion.incrementAndGet();
196 return in;
197 }
198 };
199
200 private final Func1<Fix, Boolean> inTimeRange = new Func1<Fix, Boolean>() {
201 @Override
202 public Boolean call(Fix fix) {
203
204 boolean lowerBoundOk = !options.getStartTime().isPresent()
205 || fix.time() >= options.getStartTime().get();
206 boolean upperBoundOk = !options.getFinishTime().isPresent()
207 || fix.time() < options.getFinishTime().get();
208 boolean result = lowerBoundOk && upperBoundOk;
209 if (result)
210 metrics.fixesInTimeRange.incrementAndGet();
211 return result;
212 }
213 };
214
215 private final Func1<List<Fix>, Boolean> timeDifferenceOk = new Func1<List<Fix>, Boolean>() {
216 @Override
217 public Boolean call(List<Fix> pair) {
218 Preconditions.checkArgument(pair.size() == 2);
219 Fix a = pair.get(0);
220 Fix b = pair.get(1);
221 boolean ok = timeDifferenceOk(a, b, options.getSegmentOptions());
222 if (ok)
223 metrics.segmentsTimeDifferenceOk.incrementAndGet();
224 return ok;
225 }
226 };
227
228 private final Func1<List<Fix>, Boolean> distanceOk = new Func1<List<Fix>, Boolean>() {
229 @Override
230 public Boolean call(List<Fix> pair) {
231 Preconditions.checkArgument(pair.size() == 2);
232 Fix a = pair.get(0);
233 Fix b = pair.get(1);
234 boolean ok = distanceOk(a, b, options.getSegmentOptions());
235 if (ok)
236 metrics.segmentsDistanceOk.incrementAndGet();
237 return ok;
238 }
239 };
240
241 private static boolean timeDifferenceOk(Fix a, Fix b, SegmentOptions o) {
242 long timeDiffMs = Math.abs(a.time() - b.time());
243 return o.maxTimePerSegmentMs() == null || timeDiffMs <= o.maxTimePerSegmentMs();
244 }
245
246 private static boolean distanceOk(Fix a, Fix b, SegmentOptions o) {
247 return o.maxDistancePerSegmentNm() > Util.greatCircleDistanceNm(a, b);
248 }
249
250 private static final Func1<List<Fix>, Boolean> PAIRS_ONLY = new Func1<List<Fix>, Boolean>() {
251
252 @Override
253 public Boolean call(List<Fix> list) {
254 return list.size() == 2;
255 }
256 };
257
258 private final Func1<List<Fix>, Observable<CellAndDistance>> toCellAndDistance = new Func1<List<Fix>, Observable<CellAndDistance>>() {
259
260 @Override
261 public Observable<CellAndDistance> call(List<Fix> pair) {
262 Preconditions.checkArgument(pair.size() == 2);
263 Fix fix1 = pair.get(0);
264 Fix fix2 = pair.get(1);
265 return getCellDistances(fix1, fix2, options);
266 }
267
268 };
269
270 private final Action1<CellAndDistance> countSegmentCells = new Action1<CellAndDistance>() {
271
272 @Override
273 public void call(CellAndDistance cd) {
274 metrics.segmentCells.incrementAndGet();
275 }
276
277 };
278
279 @VisibleForTesting
280 static final Observable<CellAndDistance> getCellDistances(HasPosition a, HasPosition b,
281 Options options) {
282 return getCellDistances(Util.toPos(a), Util.toPos(b), options);
283 }
284
285 @VisibleForTesting
286 static final Observable<CellAndDistance> getCellDistances(final Position a, final Position b,
287 final Options options) {
288
289 return Observable.create(new OnSubscribe<CellAndDistance>() {
290
291 @Override
292 public void call(Subscriber<? super CellAndDistance> subscriber) {
293 try {
294 GridTraversor grid = new GridTraversor(options);
295 boolean keepGoing = true;
296 Position p1 = a;
297 Position destination = b;
298 int count = 0;
299 while (keepGoing) {
300 Position p2 = grid.nextPoint(p1, destination);
301 double distanceNm = p1.getDistanceToKm(p2) / 1.852;
302
303 Optional<Cell> cell = Cell.cellAt(p1.getLat(), p1.getLon(), options);
304 if (cell.isPresent())
305 subscriber.onNext(new CellAndDistance(cell.get(), distanceNm));
306 keepGoing = p2.getLat() != destination.getLat()
307 || p2.getLon() != destination.getLon();
308 keepGoing = keepGoing && !subscriber.isUnsubscribed();
309 p1 = p2;
310 count++;
311 checkCount(p1, destination, count, options);
312 }
313 subscriber.onCompleted();
314 } catch (Throwable t) {
315
316
317 log.warn(t.getMessage(), t);
318 subscriber.onCompleted();
319
320 }
321 }
322
323 });
324 }
325
326 private static void checkCount(Position p1, Position destination, int count, Options options) {
327 if (count > 100000)
328 throw new RuntimeException("unexpectedly stuck in loop p1=" + p1 + ",destination="
329 + destination + ",options=" + options);
330 }
331
332 public DistanceCalculationMetrics getMetrics() {
333 return metrics;
334 }
335
336 public static class CalculationResult {
337 private final Observable<CellValue> cells;
338 private final DistanceCalculationMetrics metrics;
339
340 public CalculationResult(Observable<CellValue> cells, DistanceCalculationMetrics metrics) {
341 this.cells = cells;
342 this.metrics = metrics;
343 }
344
345 public Observable<CellValue> getCells() {
346 return cells;
347 }
348
349 public DistanceCalculationMetrics getMetrics() {
350 return metrics;
351 }
352
353 }
354
355 public static Observable<CellValue> calculateDensityByCellFromFiles(Options options,
356 Observable<File> files, int horizontal, int vertical,
357 DistanceCalculationMetrics metrics) {
358 Observable<CellValue> cells = partition(options, horizontal, vertical)
359
360 .concatMap(calculateDistanceTravelled(files, metrics));
361
362 if (horizontal > 1 || vertical > 1)
363
364 return cells.lift(new OperatorSumCellValues(true));
365 else
366 return cells;
367 }
368
369 private static Func1<Options, Observable<CellValue>> calculateDistanceTravelled(
370 final Observable<File> files, final DistanceCalculationMetrics metrics) {
371 return new Func1<Options, Observable<CellValue>>() {
372
373 @Override
374 public Observable<CellValue> call(Options options) {
375 log.info("running distance calculation on " + options);
376 DistanceTravelledCalculator c = new DistanceTravelledCalculator(options, metrics);
377
378
379 return Observable.from(c.calculateDistanceByCellFromFiles(files)
380
381 .map(toCellDensityValue(options))
382
383 .toList()
384
385 .toBlocking().single());
386 }
387 };
388 }
389
390 public static CalculationResult calculateTrafficDensity(Options options,
391 Observable<File> files) {
392 return calculateTrafficDensity(options, files, 1, 1);
393 }
394
395 public static CalculationResult calculateTrafficDensity(Options options, Observable<File> files,
396 int horizontal, int vertical) {
397 int maxNumCells = (int) Math.round(options.getBounds().getWidthDegrees()
398 * options.getBounds().getHeightDegrees() / options.getCellSizeDegreesAsDouble()
399 / options.getCellSizeDegreesAsDouble());
400 log.info("maxNumCells=" + maxNumCells);
401 DistanceCalculationMetrics metrics = new DistanceCalculationMetrics();
402 final Observable<CellValue> cells = DistanceTravelledCalculator
403 .calculateDensityByCellFromFiles(options, files, horizontal, vertical, metrics);
404 return new CalculationResult(cells, metrics);
405 }
406
407 private static Func1<CellAndDistance, CellValue> toCellDensityValue(final Options options) {
408 return new Func1<CellAndDistance, CellValue>() {
409
410 @Override
411 public CellValue call(CellAndDistance cd) {
412 return new CellValue(cd.getCell().getCentreLat(options),
413 cd.getCell().getCentreLon(options), cd.getTrafficDensity(options));
414 }
415 };
416 }
417
418 public static void saveCalculationResultAsText(Options options,
419 CalculationResult calculationResult, String filename) {
420 try {
421 final PrintWriter out = new PrintWriter(filename);
422 Bounds b = options.getBounds();
423 out.println(
424 "#originLat, originLon, cellSizeDegrees, topLefLat, topLeftLon, bottomRightLat, bottomRightLon");
425 out.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\n", options.getOriginLat(),
426 options.getOriginLon(), options.getCellSizeDegrees(), b.getTopLeftLat(),
427 b.getTopLeftLon(), b.getBottomRightLat(), b.getBottomRightLon());
428 out.println("#centreLat, centreLon, distanceNmPerNm2");
429
430 calculationResult.getCells().subscribe(new Observer<CellValue>() {
431
432 @Override
433 public void onCompleted() {
434 out.close();
435 }
436
437 @Override
438 public void onError(Throwable e) {
439 out.close();
440 }
441
442 @Override
443 public void onNext(CellValue cell) {
444 out.format("%s\t%s\t%s\n", cell.getCentreLat(), cell.getCentreLon(),
445 cell.getValue());
446 }
447 });
448
449 } catch (FileNotFoundException e) {
450 throw new RuntimeException(e);
451 }
452 }
453
454 public static void saveCalculationResultAsNetcdf(Options options,
455 CalculationResult calculationResult, String filename) {
456
457 List<CellValue> list = calculationResult.getCells().toList().toBlocking().single();
458
459 int maxLonIndex = list.stream()
460 .map(cell -> options.getGrid().cellAt(cell.getCentreLat(), cell.getCentreLon()))
461 .filter(x -> x.isPresent())
462 .map(x -> x.get().getLonIndex())
463 .max(Comparator.<Long> naturalOrder())
464 .get().intValue();
465 int maxLatIndex = list.stream()
466 .map(cell -> options.getGrid().cellAt(cell.getCentreLat(), cell.getCentreLon()))
467 .filter(x -> x.isPresent()).map(x -> x.get().getLatIndex())
468 .max(Comparator.<Long> naturalOrder()).get().intValue();
469
470 File file = new File(filename);
471
472
473 NetcdfFileWriter f = null;
474 try {
475
476 f = NetcdfFileWriter.createNew(NetcdfFileWriter.Version.netcdf3, file.getPath());
477
478
479
480
481
482
483 Dimension dimLat = f.addDimension(null, "latitude", maxLatIndex + 1);
484 Dimension dimLon = f.addDimension(null, "longitude", maxLonIndex + 1);
485
486 List<Dimension> dims = new ArrayList<Dimension>();
487 dims.add(dimLat);
488 dims.add(dimLon);
489
490
491 Variable vLat = f.addVariable(null, "latitude", DataType.DOUBLE, "latitude");
492 Variable vLon = f.addVariable(null, "longitude", DataType.DOUBLE, "longitude");
493
494
495 Variable vDensity = f.addVariable(null, "traffic_density", DataType.DOUBLE, dims);
496
497
498
499
500
501 vLon.addAttribute(new Attribute("units", "degrees_east"));
502 vLat.addAttribute(new Attribute("units", "degrees_north"));
503
504
505 vDensity.addAttribute(new Attribute("units", "nm-1"));
506 vDensity.addAttribute(new Attribute("long_name", ""));
507
508
509
510 f.create();
511 {
512 Array dataLat = Array.factory(DataType.DOUBLE, new int[] { dimLat.getLength() });
513 Array dataLon = Array.factory(DataType.DOUBLE, new int[] { dimLon.getLength() });
514
515
516 for (int i = 0; i <= maxLatIndex; i++) {
517 dataLat.setDouble(i, options.getGrid().centreLat(i));
518 }
519
520
521 for (int i = 0; i <= maxLonIndex; i++) {
522 dataLon.setDouble(i, options.getGrid().centreLon(i));
523 }
524
525 f.write(vLat, dataLat);
526 f.write(vLon, dataLon);
527 }
528
529
530 {
531 int[] iDim = new int[] { dimLat.getLength(), dimLon.getLength() };
532 Array dataDensity = ArrayDouble.D2.factory(DataType.DOUBLE, iDim);
533
534 Index2D idx = new Index2D(iDim);
535
536 for (CellValue point : list) {
537 Optional<Cell> cell = options.getGrid().cellAt(point.getCentreLat(),
538 point.getCentreLon());
539 if (cell.isPresent()) {
540 idx.set((int) cell.get().getLatIndex(), (int) cell.get().getLonIndex());
541 dataDensity.setDouble(idx, point.getValue());
542 }
543 }
544 f.write(vDensity, dataDensity);
545 }
546 } catch (IOException | InvalidRangeException e) {
547 throw new RuntimeException(e);
548 } finally {
549 if (f != null) {
550 try {
551 f.close();
552 } catch (IOException ioe) {
553 throw new RuntimeException(ioe);
554 }
555 }
556 }
557 }
558
559 static List<Double> makeConstantDifference(List<Double> list) {
560 List<Double> result = new ArrayList<>();
561 double diff = list.get(1) - list.get(0);
562 Double previous = null;
563 for (int i = 0; i < list.size(); i++) {
564 double next;
565 if (previous == null) {
566 next = list.get(0);
567 } else {
568 next = previous + diff;
569 }
570 result.add(next);
571 previous = next;
572 }
573 return result;
574 }
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603 private Action1<CellAndDistance> sumNauticalMiles() {
604 return new Action1<CellAndDistance>() {
605 @Override
606 public void call(CellAndDistance cell) {
607 metrics.totalNauticalMiles.addAndGet(cell.getDistanceNm());
608 }
609 };
610 }
611
612 private final Action1<? super Fix> incrementFixesCount = new Action1<Fix>() {
613
614 @Override
615 public void call(Fix fix) {
616 metrics.fixes.incrementAndGet();
617 }
618 };
619
620 private final Action1<? super Fix> countFixesPassedEffectiveSpeedCheck = new Action1<Fix>() {
621
622 @Override
623 public void call(Fix fix) {
624 metrics.fixesPassedEffectiveSpeedCheck.incrementAndGet();
625 }
626 };
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642 public static Observable<Options> partition(final Options options, final int horizontal,
643 final int vertical) {
644 List<Options> list = new ArrayList<>();
645 Bounds bounds = options.getBounds();
646 double h = bounds.getWidthDegrees() / horizontal;
647 double v = bounds.getHeightDegrees() / vertical;
648 for (int i = 0; i < horizontal; i++) {
649 for (int j = 0; j < vertical; j++) {
650 double lat = bounds.getTopLeftLat() - j * v;
651 double lon = bounds.getTopLeftLon() + i * h;
652 Bounds b = new Bounds(lat, lon, lat - v, lon + h);
653 list.add(options.buildFrom().bounds(b).filterBounds(b.expand(7, 7)).build());
654 }
655 }
656 return Observable.from(list);
657 }
658
659 }