View Javadoc
1   package au.gov.amsa.streams;
2   
3   import java.io.IOException;
4   import java.io.InputStream;
5   import java.util.Arrays;
6   
7   import rx.Observable;
8   import rx.Observer;
9   import rx.observables.SyncOnSubscribe;
10  
11  public final class OnSubscribeInputStream extends SyncOnSubscribe<InputStream, byte[]> {
12  
13      private final InputStream is;
14      private final int size;
15  
16      public OnSubscribeInputStream(InputStream is, int size) {
17          this.is = is;
18          this.size = size;
19      }
20  
21      @Override
22      protected InputStream generateState() {
23          return is;
24      }
25  
26      @Override
27      protected InputStream next(InputStream is, Observer<? super byte[]> observer) {
28          byte[] buffer = new byte[size];
29          try {
30              int count = is.read(buffer);
31              if (count == -1)
32                  observer.onCompleted();
33              else if (count < size)
34                  observer.onNext(Arrays.copyOf(buffer, count));
35              else
36                  observer.onNext(buffer);
37          } catch (IOException e) {
38              observer.onError(e);
39          }
40          return is;
41      }
42  
43      public Observable<byte[]> toObservable() {
44          return Observable.create(this);
45      }
46  }