1 package au.gov.amsa.geo; 2 3 import java.nio.ByteBuffer; 4 import java.util.concurrent.atomic.AtomicBoolean; 5 6 import rx.Observable.Operator; 7 import rx.Observer; 8 import rx.Subscriber; 9 import rx.observers.Subscribers; 10 import au.gov.amsa.geo.model.Bounds; 11 import au.gov.amsa.geo.model.CellValue; 12 import au.gov.amsa.geo.model.Options; 13 14 public class OperatorCellValuesToBytes implements Operator<byte[], CellValue> { 15 16 private final Options options; 17 18 public OperatorCellValuesToBytes(Options options) { 19 this.options = options; 20 } 21 22 @Override 23 public Subscriber<? super CellValue> call( 24 final Subscriber<? super byte[]> child) { 25 26 Subscriber<CellValue> parent = Subscribers 27 .from(new Observer<CellValue>() { 28 29 final AtomicBoolean first = new AtomicBoolean(true); 30 31 @Override 32 public void onCompleted() { 33 child.onCompleted(); 34 } 35 36 @Override 37 public void onError(Throwable e) { 38 child.onError(e); 39 } 40 41 @Override 42 public void onNext(CellValue cv) { 43 if (first.getAndSet(false)) 44 child.onNext(toBytes(options)); 45 child.onNext(toBytes(cv)); 46 } 47 }); 48 child.add(parent); 49 return parent; 50 } 51 52 private static byte[] toBytes(Options options) { 53 ByteBuffer bb = ByteBuffer.allocate(40); 54 Bounds b = options.getBounds(); 55 bb.putDouble(options.getCellSizeDegreesAsDouble()); 56 bb.putDouble(b.getTopLeftLat()); 57 bb.putDouble(b.getTopLeftLon()); 58 bb.putDouble(b.getBottomRightLat()); 59 bb.putDouble(b.getBottomRightLon()); 60 return bb.array(); 61 } 62 63 private static byte[] toBytes(CellValue cv) { 64 ByteBuffer bb = ByteBuffer.allocate(16); 65 bb.putFloat((float) cv.getCentreLat()); 66 bb.putFloat((float) cv.getCentreLon()); 67 bb.putDouble(cv.getValue()); 68 return bb.array(); 69 } 70 }