View Javadoc
1   package au.gov.amsa.geo;
2   
3   import java.nio.ByteBuffer;
4   import java.util.concurrent.atomic.AtomicBoolean;
5   
6   import rx.Observable.Operator;
7   import rx.Observer;
8   import rx.Subscriber;
9   import rx.observers.Subscribers;
10  import au.gov.amsa.geo.model.Bounds;
11  import au.gov.amsa.geo.model.CellValue;
12  import au.gov.amsa.geo.model.Options;
13  
14  public class OperatorCellValuesToBytes implements Operator<byte[], CellValue> {
15  
16  	private final Options options;
17  
18  	public OperatorCellValuesToBytes(Options options) {
19  		this.options = options;
20  	}
21  
22  	@Override
23  	public Subscriber<? super CellValue> call(
24  			final Subscriber<? super byte[]> child) {
25  
26  		Subscriber<CellValue> parent = Subscribers
27  				.from(new Observer<CellValue>() {
28  
29  					final AtomicBoolean first = new AtomicBoolean(true);
30  
31  					@Override
32  					public void onCompleted() {
33  						child.onCompleted();
34  					}
35  
36  					@Override
37  					public void onError(Throwable e) {
38  						child.onError(e);
39  					}
40  
41  					@Override
42  					public void onNext(CellValue cv) {
43  						if (first.getAndSet(false))
44  							child.onNext(toBytes(options));
45  						child.onNext(toBytes(cv));
46  					}
47  				});
48  		child.add(parent);
49  		return parent;
50  	}
51  
52  	private static byte[] toBytes(Options options) {
53  		ByteBuffer bb = ByteBuffer.allocate(40);
54  		Bounds b = options.getBounds();
55  		bb.putDouble(options.getCellSizeDegreesAsDouble());
56  		bb.putDouble(b.getTopLeftLat());
57  		bb.putDouble(b.getTopLeftLon());
58  		bb.putDouble(b.getBottomRightLat());
59  		bb.putDouble(b.getBottomRightLon());
60  		return bb.array();
61  	}
62  
63  	private static byte[] toBytes(CellValue cv) {
64  		ByteBuffer bb = ByteBuffer.allocate(16);
65  		bb.putFloat((float) cv.getCentreLat());
66  		bb.putFloat((float) cv.getCentreLon());
67  		bb.putDouble(cv.getValue());
68  		return bb.array();
69  	}
70  }