1 package au.gov.amsa.risky.format;
2
3 import java.io.File;
4 import java.io.FileInputStream;
5 import java.io.FileNotFoundException;
6 import java.io.IOException;
7 import java.io.InputStream;
8 import java.nio.ByteBuffer;
9 import java.util.LinkedList;
10 import java.util.Queue;
11 import java.util.zip.GZIPInputStream;
12
13 import com.github.davidmoten.util.Optional;
14
15 import au.gov.amsa.risky.format.BinaryFixesOnSubscribeWithBackp.State;
16 import rx.Observable;
17 import rx.Observer;
18 import rx.functions.Action1;
19 import rx.functions.Func0;
20 import rx.functions.Func1;
21 import rx.observables.SyncOnSubscribe;
22
23 public final class BinaryFixesOnSubscribeWithBackp extends SyncOnSubscribe<State, Fix> {
24
25 private final InputStream is;
26 private final Optional<Integer> mmsi;
27 private final BinaryFixesFormat format;
28
29 public BinaryFixesOnSubscribeWithBackp(InputStream is, Optional<Integer> mmsi, BinaryFixesFormat format) {
30 this.is = is;
31 this.mmsi = mmsi;
32 this.format = format;
33 }
34
35 public final static class State {
36 final InputStream is;
37 final Optional<Integer> mmsi;
38 final Queue<Fix> queue;
39
40 public State(InputStream is, Optional<Integer> mmsi, Queue<Fix> queue) {
41 this.is = is;
42 this.mmsi = mmsi;
43 this.queue = queue;
44 }
45
46 }
47
48
49
50
51
52
53
54
55 public static Observable<Fix> from(final File file, BinaryFixesFormat format) {
56
57 Func0<InputStream> resourceFactory = new Func0<InputStream>() {
58
59 @Override
60 public InputStream call() {
61 try {
62 if (file.getName().endsWith(".gz"))
63 return new GZIPInputStream(new FileInputStream(file));
64 else
65 return new FileInputStream(file);
66 } catch (FileNotFoundException e) {
67 throw new RuntimeException(e);
68 } catch (IOException e) {
69 throw new RuntimeException(e);
70 }
71 }
72 };
73
74 Func1<InputStream, Observable<Fix>> obsFactory = new Func1<InputStream, Observable<Fix>>() {
75
76 @Override
77 public Observable<Fix> call(InputStream is) {
78 Optional<Integer> mmsi;
79 if (format == BinaryFixesFormat.WITH_MMSI)
80 mmsi = Optional.absent();
81 else
82 mmsi = Optional.of(BinaryFixesUtil.getMmsi(file));
83
84 return Observable.create(new BinaryFixesOnSubscribeWithBackp(is, mmsi, format));
85 }
86 };
87 Action1<InputStream> disposeAction = new Action1<InputStream>() {
88
89 @Override
90 public void call(InputStream is) {
91 try {
92 is.close();
93 } catch (IOException e) {
94 throw new RuntimeException(e);
95 }
96 }
97 };
98 return Observable.using(resourceFactory, obsFactory, disposeAction, true);
99
100 }
101
102 @Override
103 protected State generateState() {
104 return new State(is, mmsi, new LinkedList<Fix>());
105 }
106
107 @Override
108 protected State next(State state, Observer<? super Fix> observer) {
109 int recordSize = BinaryFixes.recordSize(format);
110 Fix f = state.queue.poll();
111 if (f != null)
112 observer.onNext(f);
113 else {
114 byte[] bytes = new byte[4096 * BinaryFixes.recordSize(format)];
115 try {
116 int length;
117 if ((length = readFully(state.is, bytes)) > 0) {
118 for (int i = 0; i < length; i += recordSize) {
119 ByteBuffer bb = ByteBuffer.wrap(bytes, i, recordSize);
120 final int mmsi;
121 if (state.mmsi.isPresent()) {
122 mmsi = state.mmsi.get();
123 } else {
124 mmsi = bb.getInt();
125 }
126 Fix fix = BinaryFixesUtil.toFix(mmsi, bb);
127 state.queue.add(fix);
128 }
129 observer.onNext(state.queue.remove());
130 } else
131 observer.onCompleted();
132 } catch (IOException e) {
133 observer.onError(e);
134 }
135 }
136 return state;
137 }
138
139 private static final int readFully(InputStream in, byte[] bytes) throws IOException {
140 int n = 0;
141 int len = bytes.length;
142 while (n < len) {
143 int count = in.read(bytes, n, len - n);
144 if (count < 0) {
145 if (n == 0)
146 return -1;
147 else
148 return n;
149 }
150 n += count;
151 }
152 return n;
153 }
154
155 }