View Javadoc
1   package au.gov.amsa.geo.distance;
2   
3   import java.io.File;
4   import java.io.FileNotFoundException;
5   import java.io.IOException;
6   import java.io.PrintWriter;
7   import java.util.ArrayList;
8   import java.util.Comparator;
9   import java.util.HashMap;
10  import java.util.List;
11  import java.util.Map;
12  import java.util.Map.Entry;
13  import java.util.concurrent.atomic.AtomicLong;
14  
15  import org.apache.log4j.Logger;
16  
17  import com.github.davidmoten.rx.slf4j.Logging;
18  import com.google.common.annotations.VisibleForTesting;
19  import com.google.common.base.Optional;
20  import com.google.common.base.Preconditions;
21  
22  import au.gov.amsa.geo.model.Bounds;
23  import au.gov.amsa.geo.model.Cell;
24  import au.gov.amsa.geo.model.CellValue;
25  import au.gov.amsa.geo.model.GridTraversor;
26  import au.gov.amsa.geo.model.Options;
27  import au.gov.amsa.geo.model.SegmentOptions;
28  import au.gov.amsa.geo.model.Util;
29  import au.gov.amsa.risky.format.BinaryFixes;
30  import au.gov.amsa.risky.format.Fix;
31  import au.gov.amsa.risky.format.HasPosition;
32  import au.gov.amsa.util.navigation.Position;
33  import rx.Observable;
34  import rx.Observable.OnSubscribe;
35  import rx.Observer;
36  import rx.Subscriber;
37  import rx.functions.Action1;
38  import rx.functions.Action2;
39  import rx.functions.Func0;
40  import rx.functions.Func1;
41  import rx.schedulers.Schedulers;
42  import ucar.ma2.Array;
43  import ucar.ma2.ArrayDouble;
44  import ucar.ma2.DataType;
45  import ucar.ma2.Index2D;
46  import ucar.ma2.InvalidRangeException;
47  import ucar.nc2.Attribute;
48  import ucar.nc2.Dimension;
49  import ucar.nc2.NetcdfFileWriter;
50  import ucar.nc2.Variable;
51  
52  public class DistanceTravelledCalculator {
53  
54      private static Logger log = Logger.getLogger(DistanceTravelledCalculator.class);
55  
56      private final Options options;
57      private final DistanceCalculationMetrics metrics;
58  
59      public DistanceTravelledCalculator(Options options, DistanceCalculationMetrics metrics) {
60          this.options = options;
61          this.metrics = metrics;
62      }
63  
64      /**
65       * Returns distinct cells and the the total nautical miles travelled in that
66       * cell. Uses RxJava {@link Observable}s to maximize throughput and will
67       * scale to use all available processors and to handle a large number of
68       * files (number of open file handles should be limited by number of
69       * available processors).
70       * 
71       * @param files
72       * @return
73       */
74      public Observable<CellAndDistance> calculateDistanceByCellFromFiles(Observable<File> files) {
75          // use a map-reduce approach where the parallel method shares out
76          // ('maps') the fixes by craft to multiple threads (number determined by
77          // available processors) and is passed the 'reduce'
78          // calculateDistanceByCellFromFile() method to combine the results.
79          int numFiles = files.count().toBlocking().single();
80          log.info("numFiles=" + numFiles);
81          AtomicLong fileCount = new AtomicLong();
82          AtomicLong cellCount = new AtomicLong(1);
83          return files
84                  // buffer for parallel processing of groups of files
85                  .buffer(Math.max(1,
86                          (int) Math.round(
87                                  Math.ceil(numFiles / Runtime.getRuntime().availableProcessors()))))
88                  .flatMap(fileList -> extractCellDistances(fileCount, cellCount, fileList))
89                  // sum distances into global map
90                  .collect(bigMapFactory(), collectCellDistances())
91                  // report the cell distances for the grid
92                  .flatMap(listCellDistances())
93                  // record total nm in metrics
94                  .doOnNext(sumNauticalMiles());
95      }
96  
97      private Func1<? super HashMap<Cell, Double>, Observable<CellAndDistance>> listCellDistances() {
98          return map -> Observable.from(map.entrySet())
99                  .map(entry -> new CellAndDistance(entry.getKey(), entry.getValue()));
100     }
101 
102     private Func0<HashMap<Cell, Double>> bigMapFactory() {
103         return () -> new HashMap<Cell, Double>(20_000_000, 1.0f);
104     }
105 
106     private Action2<HashMap<Cell, Double>, Map<Cell, Double>> collectCellDistances() {
107         return (a, b) -> {
108             // put all entries in b into a
109             long t = System.currentTimeMillis();
110             log.info("reducing");
111             for (Entry<Cell, Double> entry : b.entrySet()) {
112                 Double val = a.putIfAbsent(entry.getKey(), entry.getValue());
113                 if (val != null) {
114                     a.put(entry.getKey(), val + entry.getValue());
115                 }
116             }
117             log.info("reduced in " + (System.currentTimeMillis() - t) + "ms");
118         };
119     }
120 
121     private Observable<Map<Cell, Double>> extractCellDistances(AtomicLong fileCount,
122             AtomicLong cellCount, List<File> fileList) {
123         return // extract fixes from each file
124         Observable.from(fileList)
125                 .lift(Logging.<File> logger().showCount(fileCount).every(1000).log())
126                 .map(file -> BinaryFixes.from(file))
127                 // for one craft aggregate distance (not a
128                 // problem with SerializedObserver buffering
129                 // because each file relatively small), also
130                 // subscribes on computation() to get
131                 // concurrency
132                 .flatMap(toCraftCellAndDistances)
133                 // log
134                 .lift(Logging.<CellAndDistance> logger().showCount("cellsReceived", cellCount)
135                         .every(1_000_000).showMemory().log())
136                 // sum cell distances and emit maps of up to
137                 // 100K entries
138                 .lift(OperatorSumCellDistances.create(1_000_000))
139                 .subscribeOn(Schedulers.computation());
140     }
141 
142     private final Func1<Observable<Fix>, Observable<CellAndDistance>> toCraftCellAndDistances = new Func1<Observable<Fix>, Observable<CellAndDistance>>() {
143 
144         @Override
145         public Observable<CellAndDistance> call(Observable<Fix> allFixesForASingleCraft) {
146 
147             return allFixesForASingleCraft
148                     // count fixes
149                     .doOnNext(incrementFixesCount)
150                     // filter on time between startTime and finishTime if exist
151                     .filter(inTimeRange)
152                     // restrict to fixes in filter bounds
153                     .filter(inRegion)
154                     // sort fixes by position time
155                     // .toSortedList(au.gov.amsa.geo.Util.COMPARE_FIXES_BY_POSITION_TIME)
156                     // convert list to Observable and flatten
157                     // .concatMap(au.gov.amsa.geo.Util.TO_OBSERVABLE)
158                     // keep only positions that pass effective speed
159                     .lift(filterOnEffectiveSpeedOk())
160                     // filter on passed effective speed check
161                     .filter(check -> check.isOk())
162                     // map back to the fix again
163                     .map(check -> check.fix())
164                     // update metrics with fixes passing effective speed check
165                     .doOnNext(countFixesPassedEffectiveSpeedCheck)
166                     // pair them up again
167                     .buffer(2, 1)
168                     // segments only
169                     .filter(PAIRS_ONLY)
170                     // count segments
171                     .doOnNext(segment -> metrics.segments.incrementAndGet())
172                     // remove segments with invalid time separation
173                     .filter(timeDifferenceOk)
174                     // remove segments with invalid distance separation
175                     .filter(distanceOk)
176                     // calculate distances
177                     .flatMap(toCellAndDistance)
178                     // update counts of cells in each segment
179                     .doOnNext(countSegmentCells)
180                     // use memory to buffer if producing fast
181                     .onBackpressureBuffer();
182         }
183 
184     };
185 
186     private OperatorEffectiveSpeedChecker filterOnEffectiveSpeedOk() {
187         return new OperatorEffectiveSpeedChecker(options.getSegmentOptions());
188     }
189 
190     private final Func1<Fix, Boolean> inRegion = new Func1<Fix, Boolean>() {
191         @Override
192         public Boolean call(Fix fix) {
193             boolean in = options.getFilterBounds().contains(fix);
194             if (in)
195                 metrics.fixesWithinRegion.incrementAndGet();
196             return in;
197         }
198     };
199 
200     private final Func1<Fix, Boolean> inTimeRange = new Func1<Fix, Boolean>() {
201         @Override
202         public Boolean call(Fix fix) {
203 
204             boolean lowerBoundOk = !options.getStartTime().isPresent()
205                     || fix.time() >= options.getStartTime().get();
206             boolean upperBoundOk = !options.getFinishTime().isPresent()
207                     || fix.time() < options.getFinishTime().get();
208             boolean result = lowerBoundOk && upperBoundOk;
209             if (result)
210                 metrics.fixesInTimeRange.incrementAndGet();
211             return result;
212         }
213     };
214 
215     private final Func1<List<Fix>, Boolean> timeDifferenceOk = new Func1<List<Fix>, Boolean>() {
216         @Override
217         public Boolean call(List<Fix> pair) {
218             Preconditions.checkArgument(pair.size() == 2);
219             Fix a = pair.get(0);
220             Fix b = pair.get(1);
221             boolean ok = timeDifferenceOk(a, b, options.getSegmentOptions());
222             if (ok)
223                 metrics.segmentsTimeDifferenceOk.incrementAndGet();
224             return ok;
225         }
226     };
227 
228     private final Func1<List<Fix>, Boolean> distanceOk = new Func1<List<Fix>, Boolean>() {
229         @Override
230         public Boolean call(List<Fix> pair) {
231             Preconditions.checkArgument(pair.size() == 2);
232             Fix a = pair.get(0);
233             Fix b = pair.get(1);
234             boolean ok = distanceOk(a, b, options.getSegmentOptions());
235             if (ok)
236                 metrics.segmentsDistanceOk.incrementAndGet();
237             return ok;
238         }
239     };
240 
241     private static boolean timeDifferenceOk(Fix a, Fix b, SegmentOptions o) {
242         long timeDiffMs = Math.abs(a.time() - b.time());
243         return o.maxTimePerSegmentMs() == null || timeDiffMs <= o.maxTimePerSegmentMs();
244     }
245 
246     private static boolean distanceOk(Fix a, Fix b, SegmentOptions o) {
247         return o.maxDistancePerSegmentNm() > Util.greatCircleDistanceNm(a, b);
248     }
249 
250     private static final Func1<List<Fix>, Boolean> PAIRS_ONLY = new Func1<List<Fix>, Boolean>() {
251 
252         @Override
253         public Boolean call(List<Fix> list) {
254             return list.size() == 2;
255         }
256     };
257 
258     private final Func1<List<Fix>, Observable<CellAndDistance>> toCellAndDistance = new Func1<List<Fix>, Observable<CellAndDistance>>() {
259 
260         @Override
261         public Observable<CellAndDistance> call(List<Fix> pair) {
262             Preconditions.checkArgument(pair.size() == 2);
263             Fix fix1 = pair.get(0);
264             Fix fix2 = pair.get(1);
265             return getCellDistances(fix1, fix2, options);
266         }
267 
268     };
269 
270     private final Action1<CellAndDistance> countSegmentCells = new Action1<CellAndDistance>() {
271 
272         @Override
273         public void call(CellAndDistance cd) {
274             metrics.segmentCells.incrementAndGet();
275         }
276 
277     };
278 
279     @VisibleForTesting
280     static final Observable<CellAndDistance> getCellDistances(HasPosition a, HasPosition b,
281             Options options) {
282         return getCellDistances(Util.toPos(a), Util.toPos(b), options);
283     }
284 
285     @VisibleForTesting
286     static final Observable<CellAndDistance> getCellDistances(final Position a, final Position b,
287             final Options options) {
288 
289         return Observable.create(new OnSubscribe<CellAndDistance>() {
290 
291             @Override
292             public void call(Subscriber<? super CellAndDistance> subscriber) {
293                 try {
294                     GridTraversor grid = new GridTraversor(options);
295                     boolean keepGoing = true;
296                     Position p1 = a;
297                     Position destination = b;
298                     int count = 0;
299                     while (keepGoing) {
300                         Position p2 = grid.nextPoint(p1, destination);
301                         double distanceNm = p1.getDistanceToKm(p2) / 1.852;
302                         // report cell and distance
303                         Optional<Cell> cell = Cell.cellAt(p1.getLat(), p1.getLon(), options);
304                         if (cell.isPresent())
305                             subscriber.onNext(new CellAndDistance(cell.get(), distanceNm));
306                         keepGoing = p2.getLat() != destination.getLat()
307                                 || p2.getLon() != destination.getLon();
308                         keepGoing = keepGoing && !subscriber.isUnsubscribed();
309                         p1 = p2;
310                         count++;
311                         checkCount(p1, destination, count, options);
312                     }
313                     subscriber.onCompleted();
314                 } catch (Throwable t) {
315                     // TODO resolve all problems so that this will revert to a
316                     // call to onError
317                     log.warn(t.getMessage(), t);
318                     subscriber.onCompleted();
319                     // subscriber.onError(t);
320                 }
321             }
322 
323         });
324     }
325 
326     private static void checkCount(Position p1, Position destination, int count, Options options) {
327         if (count > 100000)
328             throw new RuntimeException("unexpectedly stuck in loop p1=" + p1 + ",destination="
329                     + destination + ",options=" + options);
330     }
331 
332     public DistanceCalculationMetrics getMetrics() {
333         return metrics;
334     }
335 
336     public static class CalculationResult {
337         private final Observable<CellValue> cells;
338         private final DistanceCalculationMetrics metrics;
339 
340         public CalculationResult(Observable<CellValue> cells, DistanceCalculationMetrics metrics) {
341             this.cells = cells;
342             this.metrics = metrics;
343         }
344 
345         public Observable<CellValue> getCells() {
346             return cells;
347         }
348 
349         public DistanceCalculationMetrics getMetrics() {
350             return metrics;
351         }
352 
353     }
354 
355     public static Observable<CellValue> calculateDensityByCellFromFiles(Options options,
356             Observable<File> files, int horizontal, int vertical,
357             DistanceCalculationMetrics metrics) {
358         Observable<CellValue> cells = partition(options, horizontal, vertical)
359                 // get results (blocks to limit memory use)
360                 .concatMap(calculateDistanceTravelled(files, metrics));
361 
362         if (horizontal > 1 || vertical > 1)
363             // aggregate using a file backed map
364             return cells.lift(new OperatorSumCellValues(true));
365         else
366             return cells;
367     }
368 
369     private static Func1<Options, Observable<CellValue>> calculateDistanceTravelled(
370             final Observable<File> files, final DistanceCalculationMetrics metrics) {
371         return new Func1<Options, Observable<CellValue>>() {
372 
373             @Override
374             public Observable<CellValue> call(Options options) {
375                 log.info("running distance calculation on " + options);
376                 DistanceTravelledCalculator c = new DistanceTravelledCalculator(options, metrics);
377                 // blocks to return answer, this is desirable because we need to
378                 // back the results with a file because they can get so large
379                 return Observable.from(c.calculateDistanceByCellFromFiles(files)
380                         // as cell density values
381                         .map(toCellDensityValue(options))
382                         // as list
383                         .toList()
384                         // block and get
385                         .toBlocking().single());
386             }
387         };
388     }
389 
390     public static CalculationResult calculateTrafficDensity(Options options,
391             Observable<File> files) {
392         return calculateTrafficDensity(options, files, 1, 1);
393     }
394 
395     public static CalculationResult calculateTrafficDensity(Options options, Observable<File> files,
396             int horizontal, int vertical) {
397         int maxNumCells = (int) Math.round(options.getBounds().getWidthDegrees()
398                 * options.getBounds().getHeightDegrees() / options.getCellSizeDegreesAsDouble()
399                 / options.getCellSizeDegreesAsDouble());
400         log.info("maxNumCells=" + maxNumCells);
401         DistanceCalculationMetrics metrics = new DistanceCalculationMetrics();
402         final Observable<CellValue> cells = DistanceTravelledCalculator
403                 .calculateDensityByCellFromFiles(options, files, horizontal, vertical, metrics);
404         return new CalculationResult(cells, metrics);
405     }
406 
407     private static Func1<CellAndDistance, CellValue> toCellDensityValue(final Options options) {
408         return new Func1<CellAndDistance, CellValue>() {
409 
410             @Override
411             public CellValue call(CellAndDistance cd) {
412                 return new CellValue(cd.getCell().getCentreLat(options),
413                         cd.getCell().getCentreLon(options), cd.getTrafficDensity(options));
414             }
415         };
416     }
417 
418     public static void saveCalculationResultAsText(Options options,
419             CalculationResult calculationResult, String filename) {
420         try {
421             final PrintWriter out = new PrintWriter(filename);
422             Bounds b = options.getBounds();
423             out.println(
424                     "#originLat, originLon, cellSizeDegrees, topLefLat, topLeftLon, bottomRightLat, bottomRightLon");
425             out.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\n", options.getOriginLat(),
426                     options.getOriginLon(), options.getCellSizeDegrees(), b.getTopLeftLat(),
427                     b.getTopLeftLon(), b.getBottomRightLat(), b.getBottomRightLon());
428             out.println("#centreLat, centreLon, distanceNmPerNm2");
429 
430             calculationResult.getCells().subscribe(new Observer<CellValue>() {
431 
432                 @Override
433                 public void onCompleted() {
434                     out.close();
435                 }
436 
437                 @Override
438                 public void onError(Throwable e) {
439                     out.close();
440                 }
441 
442                 @Override
443                 public void onNext(CellValue cell) {
444                     out.format("%s\t%s\t%s\n", cell.getCentreLat(), cell.getCentreLon(),
445                             cell.getValue());
446                 }
447             });
448 
449         } catch (FileNotFoundException e) {
450             throw new RuntimeException(e);
451         }
452     }
453 
454     public static void saveCalculationResultAsNetcdf(Options options,
455             CalculationResult calculationResult, String filename) {
456 
457         List<CellValue> list = calculationResult.getCells().toList().toBlocking().single();
458 
459         int maxLonIndex = list.stream()
460                 .map(cell -> options.getGrid().cellAt(cell.getCentreLat(), cell.getCentreLon()))
461                 .filter(x -> x.isPresent()) //
462                 .map(x -> x.get().getLonIndex())
463                 .max(Comparator.<Long> naturalOrder()) //
464                 .get().intValue();
465         int maxLatIndex = list.stream()
466                 .map(cell -> options.getGrid().cellAt(cell.getCentreLat(), cell.getCentreLon()))
467                 .filter(x -> x.isPresent()).map(x -> x.get().getLatIndex())
468                 .max(Comparator.<Long> naturalOrder()).get().intValue();
469 
470         File file = new File(filename);
471 
472         // Create the file.
473         NetcdfFileWriter f = null;
474         try {
475             // Create new netcdf-3 file with the given filename
476             f = NetcdfFileWriter.createNew(NetcdfFileWriter.Version.netcdf3, file.getPath());
477 
478             // In addition to the latitude and longitude dimensions, we will
479             // also create latitude and longitude netCDF variables which will
480             // hold the actual latitudes and longitudes. Since they hold data
481             // about the coordinate system, the netCDF term for these is:
482             // "coordinate variables."
483             Dimension dimLat = f.addDimension(null, "latitude", maxLatIndex + 1);
484             Dimension dimLon = f.addDimension(null, "longitude", maxLonIndex + 1);
485 
486             List<Dimension> dims = new ArrayList<Dimension>();
487             dims.add(dimLat);
488             dims.add(dimLon);
489 
490             // coordinate variables
491             Variable vLat = f.addVariable(null, "latitude", DataType.DOUBLE, "latitude");
492             Variable vLon = f.addVariable(null, "longitude", DataType.DOUBLE, "longitude");
493 
494             // value variables
495             Variable vDensity = f.addVariable(null, "traffic_density", DataType.DOUBLE, dims);
496 
497             // Define units attributes for coordinate vars. This attaches a
498             // text attribute to each of the coordinate variables, containing
499             // the units.
500 
501             vLon.addAttribute(new Attribute("units", "degrees_east"));
502             vLat.addAttribute(new Attribute("units", "degrees_north"));
503 
504             // Define units attributes for variables.
505             vDensity.addAttribute(new Attribute("units", "nm-1"));
506             vDensity.addAttribute(new Attribute("long_name", ""));
507 
508             // Write the coordinate variable data. This will put the latitudes
509             // and longitudes of our data grid into the netCDF file.
510             f.create();
511             {
512                 Array dataLat = Array.factory(DataType.DOUBLE, new int[] { dimLat.getLength() });
513                 Array dataLon = Array.factory(DataType.DOUBLE, new int[] { dimLon.getLength() });
514 
515                 // set latitudes
516                 for (int i = 0; i <= maxLatIndex; i++) {
517                     dataLat.setDouble(i, options.getGrid().centreLat(i));
518                 }
519 
520                 // set longitudes
521                 for (int i = 0; i <= maxLonIndex; i++) {
522                     dataLon.setDouble(i, options.getGrid().centreLon(i));
523                 }
524 
525                 f.write(vLat, dataLat);
526                 f.write(vLon, dataLon);
527             }
528 
529             // write the value variable data
530             {
531                 int[] iDim = new int[] { dimLat.getLength(), dimLon.getLength() };
532                 Array dataDensity = ArrayDouble.D2.factory(DataType.DOUBLE, iDim);
533 
534                 Index2D idx = new Index2D(iDim);
535 
536                 for (CellValue point : list) {
537                     Optional<Cell> cell = options.getGrid().cellAt(point.getCentreLat(),
538                             point.getCentreLon());
539                     if (cell.isPresent()) {
540                         idx.set((int) cell.get().getLatIndex(), (int) cell.get().getLonIndex());
541                         dataDensity.setDouble(idx, point.getValue());
542                     }
543                 }
544                 f.write(vDensity, dataDensity);
545             }
546         } catch (IOException | InvalidRangeException e) {
547             throw new RuntimeException(e);
548         } finally {
549             if (f != null) {
550                 try {
551                     f.close();
552                 } catch (IOException ioe) {
553                     throw new RuntimeException(ioe);
554                 }
555             }
556         }
557     }
558 
559     static List<Double> makeConstantDifference(List<Double> list) {
560         List<Double> result = new ArrayList<>();
561         double diff = list.get(1) - list.get(0);
562         Double previous = null;
563         for (int i = 0; i < list.size(); i++) {
564             double next;
565             if (previous == null) {
566                 next = list.get(0);
567             } else {
568                 next = previous + diff;
569             }
570             result.add(next);
571             previous = next;
572         }
573         return result;
574     }
575 
576     // public static void saveCalculationResultAsBinary(Options options,
577     // CalculationResult calculationResult, String filename) {
578     // try {
579     // List<CellValue> cells = calculationResult.getCells();
580     // DataOutputStream out = new DataOutputStream(
581     // new BufferedOutputStream(new FileOutputStream(filename)));
582     //
583     // Bounds b = options.getBounds();
584     // out.writeDouble(options.getCellSizeDegreesAsDouble());
585     // out.writeDouble(b.getTopLeftLat());
586     // out.writeDouble(b.getTopLeftLon());
587     // out.writeDouble(b.getBottomRightLat());
588     // out.writeDouble(b.getBottomRightLon());
589     //
590     // for (CellValue cell : cells) {
591     // out.writeDouble(cell.getCentreLat());
592     // out.writeDouble(cell.getCentreLon());
593     // out.writeDouble(cell.getValue());
594     // }
595     // out.close();
596     // } catch (FileNotFoundException e) {
597     // throw new RuntimeException(e);
598     // } catch (IOException e) {
599     // throw new RuntimeException(e);
600     // }
601     // }
602 
603     private Action1<CellAndDistance> sumNauticalMiles() {
604         return new Action1<CellAndDistance>() {
605             @Override
606             public void call(CellAndDistance cell) {
607                 metrics.totalNauticalMiles.addAndGet(cell.getDistanceNm());
608             }
609         };
610     }
611 
612     private final Action1<? super Fix> incrementFixesCount = new Action1<Fix>() {
613 
614         @Override
615         public void call(Fix fix) {
616             metrics.fixes.incrementAndGet();
617         }
618     };
619 
620     private final Action1<? super Fix> countFixesPassedEffectiveSpeedCheck = new Action1<Fix>() {
621 
622         @Override
623         public void call(Fix fix) {
624             metrics.fixesPassedEffectiveSpeedCheck.incrementAndGet();
625         }
626     };
627 
628     /**
629      * Returns a sequence of {@link Options} that are same as the source apart
630      * from the {@link Bounds} which are partitioned according to horizontal and
631      * vertical parameters. For map-reduce purposes we need to be able to
632      * partition the bounds of Options. Passing horizontal=1 and vertical=1 will
633      * return one item only being a copy of the source {@link Options}.
634      * 
635      * @param options
636      * @param horizontal
637      *            number of regions (with longitude)
638      * @param vertical
639      *            number of regions (with latitude)
640      * @return
641      */
642     public static Observable<Options> partition(final Options options, final int horizontal,
643             final int vertical) {
644         List<Options> list = new ArrayList<>();
645         Bounds bounds = options.getBounds();
646         double h = bounds.getWidthDegrees() / horizontal;
647         double v = bounds.getHeightDegrees() / vertical;
648         for (int i = 0; i < horizontal; i++) {
649             for (int j = 0; j < vertical; j++) {
650                 double lat = bounds.getTopLeftLat() - j * v;
651                 double lon = bounds.getTopLeftLon() + i * h;
652                 Bounds b = new Bounds(lat, lon, lat - v, lon + h);
653                 list.add(options.buildFrom().bounds(b).filterBounds(b.expand(7, 7)).build());
654             }
655         }
656         return Observable.from(list);
657     }
658 
659 }