1 package au.gov.amsa.geo.distance;
2
3 import java.util.Collections;
4 import java.util.HashMap;
5 import java.util.Map;
6
7 import au.gov.amsa.geo.model.Cell;
8 import rx.Observable.Operator;
9 import rx.Subscriber;
10
11 public final class OperatorSumCellDistances
12 implements Operator<Map<Cell, Double>, CellAndDistance> {
13
14 private static final int INITIAL_CAPACITY = 100000;
15
16
17
18
19 private Map<Cell, Double> map = createMap();
20
21 private final int maxSize;
22
23 private OperatorSumCellDistances(int maxSize) {
24 this.maxSize = maxSize;
25 }
26
27 public static OperatorSumCellDistances create(int maxSize) {
28 return new OperatorSumCellDistances(maxSize);
29 }
30
31 @Override
32 public Subscriber<? super CellAndDistance> call(
33 final Subscriber<? super Map<Cell, Double>> child) {
34
35 Subscriber<CellAndDistance> parent = new Subscriber<CellAndDistance>(child) {
36
37 @Override
38 public void onCompleted() {
39 try {
40 child.onNext(Collections.unmodifiableMap(map));
41 child.onCompleted();
42 } catch (Throwable t) {
43 onError(t);
44 }
45 }
46
47 @Override
48 public void onError(Throwable e) {
49 child.onError(e);
50 }
51
52 @Override
53 public void onNext(CellAndDistance cd) {
54 Cell key = cd.getCell();
55
56 Double val = map.putIfAbsent(key, cd.getDistanceNm());
57 if (val != null) {
58 map.put(key, val + cd.getDistanceNm());
59 request(1);
60 } else {
61 if (map.size() == maxSize) {
62 Map<Cell, Double> m = map;
63 map = createMap();
64 child.onNext(Collections.unmodifiableMap(m));
65 } else
66 request(1);
67 }
68 }
69 };
70 return parent;
71 }
72
73 private static Map<Cell, Double> createMap() {
74 return new HashMap<Cell, Double>(INITIAL_CAPACITY, 1.0f);
75 }
76 }