1 package au.gov.amsa.geo;
2
3 import java.nio.ByteBuffer;
4 import java.util.concurrent.atomic.AtomicBoolean;
5
6 import rx.Observable.Operator;
7 import rx.Observer;
8 import rx.Subscriber;
9 import rx.observers.Subscribers;
10 import au.gov.amsa.geo.model.Bounds;
11 import au.gov.amsa.geo.model.CellValue;
12 import au.gov.amsa.geo.model.Options;
13
14 public class OperatorCellValuesToBytes implements Operator<byte[], CellValue> {
15
16 private final Options options;
17
18 public OperatorCellValuesToBytes(Options options) {
19 this.options = options;
20 }
21
22 @Override
23 public Subscriber<? super CellValue> call(
24 final Subscriber<? super byte[]> child) {
25
26 Subscriber<CellValue> parent = Subscribers
27 .from(new Observer<CellValue>() {
28
29 final AtomicBoolean first = new AtomicBoolean(true);
30
31 @Override
32 public void onCompleted() {
33 child.onCompleted();
34 }
35
36 @Override
37 public void onError(Throwable e) {
38 child.onError(e);
39 }
40
41 @Override
42 public void onNext(CellValue cv) {
43 if (first.getAndSet(false))
44 child.onNext(toBytes(options));
45 child.onNext(toBytes(cv));
46 }
47 });
48 child.add(parent);
49 return parent;
50 }
51
52 private static byte[] toBytes(Options options) {
53 ByteBuffer bb = ByteBuffer.allocate(40);
54 Bounds b = options.getBounds();
55 bb.putDouble(options.getCellSizeDegreesAsDouble());
56 bb.putDouble(b.getTopLeftLat());
57 bb.putDouble(b.getTopLeftLon());
58 bb.putDouble(b.getBottomRightLat());
59 bb.putDouble(b.getBottomRightLon());
60 return bb.array();
61 }
62
63 private static byte[] toBytes(CellValue cv) {
64 ByteBuffer bb = ByteBuffer.allocate(16);
65 bb.putFloat((float) cv.getCentreLat());
66 bb.putFloat((float) cv.getCentreLon());
67 bb.putDouble(cv.getValue());
68 return bb.array();
69 }
70 }