1 package au.gov.amsa.streams;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Arrays;
6
7 import rx.Observable;
8 import rx.Observer;
9 import rx.observables.SyncOnSubscribe;
10
11 public final class OnSubscribeInputStream extends SyncOnSubscribe<InputStream, byte[]> {
12
13 private final InputStream is;
14 private final int size;
15
16 public OnSubscribeInputStream(InputStream is, int size) {
17 this.is = is;
18 this.size = size;
19 }
20
21 @Override
22 protected InputStream generateState() {
23 return is;
24 }
25
26 @Override
27 protected InputStream next(InputStream is, Observer<? super byte[]> observer) {
28 byte[] buffer = new byte[size];
29 try {
30 int count = is.read(buffer);
31 if (count == -1)
32 observer.onCompleted();
33 else if (count < size)
34 observer.onNext(Arrays.copyOf(buffer, count));
35 else
36 observer.onNext(buffer);
37 } catch (IOException e) {
38 observer.onError(e);
39 }
40 return is;
41 }
42
43 public Observable<byte[]> toObservable() {
44 return Observable.create(this);
45 }
46 }