1 package au.gov.amsa.geo;
2
3 import java.io.BufferedInputStream;
4 import java.io.DataInputStream;
5 import java.io.EOFException;
6 import java.io.File;
7 import java.io.FileInputStream;
8 import java.io.FileNotFoundException;
9 import java.io.IOException;
10 import java.io.InputStream;
11
12 import rx.Observable;
13 import rx.Observable.OnSubscribe;
14 import rx.Subscriber;
15 import au.gov.amsa.geo.model.CellValue;
16
17 public class BinaryCellValuesObservable {
18
19
20
21
22
23
24
25
26 public static Observable<?> readValues(final InputStream is) {
27 return Observable.create(createOnSubscribe(null, is));
28 }
29
30 public static Observable<?> readValues(final File file) {
31 return Observable.create(createOnSubscribe(file, null));
32 }
33
34 private static OnSubscribe<Object> createOnSubscribe(final File file,
35 final InputStream is) {
36 return new OnSubscribe<Object>() {
37 @Override
38 public void call(Subscriber<Object> subscriber) {
39 final InputStream inputStream;
40 if (file != null) {
41 try {
42 inputStream = new FileInputStream(file);
43 } catch (FileNotFoundException e) {
44 subscriber.onError(e);
45 return;
46 }
47 } else
48 inputStream = is;
49 DataInputStream in = new DataInputStream(
50 new BufferedInputStream(inputStream, 8192));
51 try {
52 double cellSizeDegrees = in.readDouble();
53 subscriber.onNext(cellSizeDegrees);
54
55 in.readDouble();
56
57 in.readDouble();
58
59 in.readDouble();
60
61 in.readDouble();
62 while (!subscriber.isUnsubscribed()) {
63 double centreLat = in.readFloat();
64 double centreLon = in.readFloat();
65 double value = in.readDouble();
66 subscriber.onNext(new CellValue(centreLat, centreLon,
67 value));
68 }
69 close(in);
70 subscriber.onCompleted();
71 } catch (EOFException e) {
72 close(in);
73 subscriber.onCompleted();
74 } catch (Throwable e) {
75 close(in);
76 subscriber.onError(e);
77 }
78 }
79
80 private void close(DataInputStream in) {
81 try {
82 in.close();
83 } catch (IOException e) {
84
85 }
86 }
87 };
88
89 }
90
91 }