View Javadoc
1   package au.gov.amsa.risky.format;
2   
3   import java.io.File;
4   import java.io.FileInputStream;
5   import java.io.FileNotFoundException;
6   import java.io.IOException;
7   import java.io.InputStream;
8   import java.nio.ByteBuffer;
9   import java.util.LinkedList;
10  import java.util.Queue;
11  import java.util.zip.GZIPInputStream;
12  
13  import com.github.davidmoten.util.Optional;
14  
15  import au.gov.amsa.risky.format.BinaryFixesOnSubscribeWithBackp.State;
16  import rx.Observable;
17  import rx.Observer;
18  import rx.functions.Action1;
19  import rx.functions.Func0;
20  import rx.functions.Func1;
21  import rx.observables.SyncOnSubscribe;
22  
23  public final class BinaryFixesOnSubscribeWithBackp extends SyncOnSubscribe<State, Fix> {
24  
25      private final InputStream is;
26      private final Optional<Integer> mmsi;
27      private final BinaryFixesFormat format;
28  
29      public BinaryFixesOnSubscribeWithBackp(InputStream is, Optional<Integer> mmsi, BinaryFixesFormat format) {
30          this.is = is;
31          this.mmsi = mmsi;
32          this.format = format;
33      }
34  
35      public final static class State {
36          final InputStream is;
37          final Optional<Integer> mmsi;
38          final Queue<Fix> queue;
39  
40          public State(InputStream is, Optional<Integer> mmsi, Queue<Fix> queue) {
41              this.is = is;
42              this.mmsi = mmsi;
43              this.queue = queue;
44          }
45  
46      }
47  
48      /**
49       * Returns stream of fixes from the given file. If the file name ends in '.gz'
50       * then the file is unzipped before being read.
51       * 
52       * @param file
53       * @return fixes stream
54       */
55      public static Observable<Fix> from(final File file, BinaryFixesFormat format) {
56  
57          Func0<InputStream> resourceFactory = new Func0<InputStream>() {
58  
59              @Override
60              public InputStream call() {
61                  try {
62                      if (file.getName().endsWith(".gz"))
63                          return new GZIPInputStream(new FileInputStream(file));
64                      else
65                          return new FileInputStream(file);
66                  } catch (FileNotFoundException e) {
67                      throw new RuntimeException(e);
68                  } catch (IOException e) {
69                      throw new RuntimeException(e);
70                  }
71              }
72          };
73  
74          Func1<InputStream, Observable<Fix>> obsFactory = new Func1<InputStream, Observable<Fix>>() {
75  
76              @Override
77              public Observable<Fix> call(InputStream is) {
78                  Optional<Integer> mmsi;
79                  if (format == BinaryFixesFormat.WITH_MMSI)
80                      mmsi = Optional.absent();
81                  else
82                      mmsi = Optional.of(BinaryFixesUtil.getMmsi(file));
83  
84                  return Observable.create(new BinaryFixesOnSubscribeWithBackp(is, mmsi, format));
85              }
86          };
87          Action1<InputStream> disposeAction = new Action1<InputStream>() {
88  
89              @Override
90              public void call(InputStream is) {
91                  try {
92                      is.close();
93                  } catch (IOException e) {
94                      throw new RuntimeException(e);
95                  }
96              }
97          };
98          return Observable.using(resourceFactory, obsFactory, disposeAction, true);
99  
100     }
101 
102     @Override
103     protected State generateState() {
104         return new State(is, mmsi, new LinkedList<Fix>());
105     }
106 
107     @Override
108     protected State next(State state, Observer<? super Fix> observer) {
109         int recordSize = BinaryFixes.recordSize(format);
110         Fix f = state.queue.poll();
111         if (f != null)
112             observer.onNext(f);
113         else {
114             byte[] bytes = new byte[4096 * BinaryFixes.recordSize(format)];
115             try {
116                 int length;
117                 if ((length = readFully(state.is, bytes)) > 0) {
118                     for (int i = 0; i < length; i += recordSize) {
119                         ByteBuffer bb = ByteBuffer.wrap(bytes, i, recordSize);
120                         final int mmsi;
121                         if (state.mmsi.isPresent()) {
122                             mmsi = state.mmsi.get();
123                         } else {
124                             mmsi = bb.getInt();
125                         }
126                         Fix fix = BinaryFixesUtil.toFix(mmsi, bb);
127                         state.queue.add(fix);
128                     }
129                     observer.onNext(state.queue.remove());
130                 } else
131                     observer.onCompleted();
132             } catch (IOException e) {
133                 observer.onError(e);
134             }
135         }
136         return state;
137     }
138 
139     private static final int readFully(InputStream in, byte[] bytes) throws IOException {
140         int n = 0;
141         int len = bytes.length;
142         while (n < len) {
143             int count = in.read(bytes, n, len - n);
144             if (count < 0) {
145                 if (n == 0)
146                     return -1;
147                 else
148                     return n;
149             }
150             n += count;
151         }
152         return n;
153     }
154 
155 }