View Javadoc
1   package au.gov.amsa.geo;
2   
3   import java.io.BufferedInputStream;
4   import java.io.DataInputStream;
5   import java.io.EOFException;
6   import java.io.File;
7   import java.io.FileInputStream;
8   import java.io.FileNotFoundException;
9   import java.io.IOException;
10  import java.io.InputStream;
11  
12  import rx.Observable;
13  import rx.Observable.OnSubscribe;
14  import rx.Subscriber;
15  import au.gov.amsa.geo.model.CellValue;
16  
17  public class BinaryCellValuesObservable {
18  
19  	/**
20  	 * Returns a sequence one Double value being the cellSizeDegrees followed by
21  	 * multiple {@link CellValueImpl}s.
22  	 * 
23  	 * @param is
24  	 * @return
25  	 */
26  	public static Observable<?> readValues(final InputStream is) {
27  		return Observable.create(createOnSubscribe(null, is));
28  	}
29  
30  	public static Observable<?> readValues(final File file) {
31  		return Observable.create(createOnSubscribe(file, null));
32  	}
33  
34  	private static OnSubscribe<Object> createOnSubscribe(final File file,
35  			final InputStream is) {
36  		return new OnSubscribe<Object>() {
37  			@Override
38  			public void call(Subscriber<Object> subscriber) {
39  				final InputStream inputStream;
40  				if (file != null) {
41  					try {
42  						inputStream = new FileInputStream(file);
43  					} catch (FileNotFoundException e) {
44  						subscriber.onError(e);
45  						return;
46  					}
47  				} else
48  					inputStream = is;
49  				DataInputStream in = new DataInputStream(
50  						new BufferedInputStream(inputStream, 8192));
51  				try {
52  					double cellSizeDegrees = in.readDouble();
53  					subscriber.onNext(cellSizeDegrees);
54  					// topLeftLat
55  					in.readDouble();
56  					// topLeftLon
57  					in.readDouble();
58  					// bottomRightLat
59  					in.readDouble();
60  					// bottomRightLon
61  					in.readDouble();
62  					while (!subscriber.isUnsubscribed()) {
63  						double centreLat = in.readFloat();
64  						double centreLon = in.readFloat();
65  						double value = in.readDouble();
66  						subscriber.onNext(new CellValue(centreLat, centreLon,
67  								value));
68  					}
69  					close(in);
70  					subscriber.onCompleted();
71  				} catch (EOFException e) {
72  					close(in);
73  					subscriber.onCompleted();
74  				} catch (Throwable e) {
75  					close(in);
76  					subscriber.onError(e);
77  				}
78  			}
79  
80  			private void close(DataInputStream in) {
81  				try {
82  					in.close();
83  				} catch (IOException e) {
84  					// ignore close problem
85  				}
86  			}
87  		};
88  
89  	}
90  
91  }