View Javadoc
1   package au.gov.amsa.geo.distance;
2   
3   import java.util.Collections;
4   import java.util.HashMap;
5   import java.util.Map;
6   
7   import au.gov.amsa.geo.model.Cell;
8   import rx.Observable.Operator;
9   import rx.Subscriber;
10  
11  public final class OperatorSumCellDistances
12          implements Operator<Map<Cell, Double>, CellAndDistance> {
13  
14      private static final int INITIAL_CAPACITY = 100000;
15  
16      /**
17       * This takes about 100 bytes per entry of memory;
18       */
19      private Map<Cell, Double> map = createMap();
20  
21      private final int maxSize;
22  
23      private OperatorSumCellDistances(int maxSize) {
24          this.maxSize = maxSize;
25      }
26  
27      public static OperatorSumCellDistances create(int maxSize) {
28          return new OperatorSumCellDistances(maxSize);
29      }
30  
31      @Override
32      public Subscriber<? super CellAndDistance> call(
33              final Subscriber<? super Map<Cell, Double>> child) {
34  
35          Subscriber<CellAndDistance> parent = new Subscriber<CellAndDistance>(child) {
36  
37              @Override
38              public void onCompleted() {
39                  try {
40                      child.onNext(Collections.unmodifiableMap(map));
41                      child.onCompleted();
42                  } catch (Throwable t) {
43                      onError(t);
44                  }
45              }
46  
47              @Override
48              public void onError(Throwable e) {
49                  child.onError(e);
50              }
51  
52              @Override
53              public void onNext(CellAndDistance cd) {
54                  Cell key = cd.getCell();
55  
56                  Double val = map.putIfAbsent(key, cd.getDistanceNm());
57                  if (val != null) {
58                      map.put(key, val + cd.getDistanceNm());
59                      request(1);
60                  } else {
61                      if (map.size() == maxSize) {
62                          Map<Cell, Double> m = map;
63                          map = createMap();
64                          child.onNext(Collections.unmodifiableMap(m));
65                      } else
66                          request(1);
67                  }
68              }
69          };
70          return parent;
71      }
72  
73      private static Map<Cell, Double> createMap() {
74          return new HashMap<Cell, Double>(INITIAL_CAPACITY, 1.0f);
75      }
76  }