1 package au.gov.amsa.geo.distance;
2
3 import java.util.HashMap;
4 import java.util.Map;
5 import java.util.Map.Entry;
6 import java.util.UUID;
7
8 import org.apache.log4j.Logger;
9
10 import rx.Observable.Operator;
11 import rx.Observer;
12 import rx.Subscriber;
13 import rx.observers.Subscribers;
14 import au.gov.amsa.geo.model.CellValue;
15 import au.gov.amsa.geo.model.Position;
16
17 public class OperatorSumCellValues implements Operator<CellValue, CellValue> {
18
19 private static Logger log = Logger.getLogger(OperatorSumCellValues.class);
20
21 private static final int INITIAL_CAPACITY = 35000000;
22
23
24
25
26 final Map<Position, Double> map;
27
28 public OperatorSumCellValues(boolean useDisk) {
29 if (useDisk)
30 map = MapDb.INSTANCE.getDb().getHashMap(UUID.randomUUID().toString());
31 else
32 map = new HashMap<Position, Double>(INITIAL_CAPACITY, 1.0f);
33 }
34
35 public OperatorSumCellValues() {
36 this(false);
37 }
38
39
40
41 @Override
42 public Subscriber<? super CellValue> call(final Subscriber<? super CellValue> child) {
43
44 Subscriber<CellValue> parent = Subscribers.from(new Observer<CellValue>() {
45
46 @Override
47 public void onCompleted() {
48
49 try {
50 log.info("starting to emit map values");
51 synchronized (map) {
52 for (Entry<Position, Double> entry : map.entrySet()) {
53 CellValue cv = new CellValue(entry.getKey().lat(),
54 entry.getKey().lon(), entry.getValue().doubleValue());
55 child.onNext(cv);
56 }
57 }
58 child.onCompleted();
59 } catch (Throwable t) {
60 onError(t);
61 }
62
63 }
64
65 @Override
66 public void onError(Throwable e) {
67 child.onError(e);
68 }
69
70 @Override
71 public void onNext(CellValue cv) {
72 Position position = new Position((float) cv.getCentreLat(), (float) cv
73 .getCentreLon());
74 Double val = map.putIfAbsent(position, cv.getValue());
75 if (val != null)
76 map.put(position, val + cv.getValue());
77 }
78 });
79 child.add(parent);
80 return parent;
81 }
82 }