View Javadoc
1   package au.gov.amsa.geo.distance;
2   
3   import java.util.HashMap;
4   import java.util.Map;
5   import java.util.Map.Entry;
6   import java.util.UUID;
7   
8   import org.apache.log4j.Logger;
9   
10  import rx.Observable.Operator;
11  import rx.Observer;
12  import rx.Subscriber;
13  import rx.observers.Subscribers;
14  import au.gov.amsa.geo.model.CellValue;
15  import au.gov.amsa.geo.model.Position;
16  
17  public class OperatorSumCellValues implements Operator<CellValue, CellValue> {
18  
19      private static Logger log = Logger.getLogger(OperatorSumCellValues.class);
20  
21      private static final int INITIAL_CAPACITY = 35000000;
22  
23      /**
24       * This takes about 100 bytes per entry of memory;
25       */
26      final Map<Position, Double> map;
27  
28      public OperatorSumCellValues(boolean useDisk) {
29          if (useDisk)
30              map = MapDb.INSTANCE.getDb().getHashMap(UUID.randomUUID().toString());
31          else
32              map = new HashMap<Position, Double>(INITIAL_CAPACITY, 1.0f);
33      }
34  
35      public OperatorSumCellValues() {
36          this(false);
37      }
38  
39      // private final AtomicLong count = new AtomicLong();
40  
41      @Override
42      public Subscriber<? super CellValue> call(final Subscriber<? super CellValue> child) {
43  
44          Subscriber<CellValue> parent = Subscribers.from(new Observer<CellValue>() {
45  
46              @Override
47              public void onCompleted() {
48                  // MapDb.INSTANCE.getDb().commit();
49                  try {
50                      log.info("starting to emit map values");
51                      synchronized (map) {
52                          for (Entry<Position, Double> entry : map.entrySet()) {
53                              CellValue cv = new CellValue(entry.getKey().lat(),
54                                      entry.getKey().lon(), entry.getValue().doubleValue());
55                              child.onNext(cv);
56                          }
57                      }
58                      child.onCompleted();
59                  } catch (Throwable t) {
60                      onError(t);
61                  }
62  
63              }
64  
65              @Override
66              public void onError(Throwable e) {
67                  child.onError(e);
68              }
69  
70              @Override
71              public void onNext(CellValue cv) {
72                  Position position = new Position((float) cv.getCentreLat(), (float) cv
73                          .getCentreLon());
74                  Double val = map.putIfAbsent(position, cv.getValue());
75                  if (val != null)
76                      map.put(position, val + cv.getValue());
77              }
78          });
79          child.add(parent);
80          return parent;
81      }
82  }