1 package au.gov.amsa.streams; 2 3 import java.io.IOException; 4 import java.io.InputStream; 5 import java.util.Arrays; 6 7 import rx.Observable; 8 import rx.Observer; 9 import rx.observables.SyncOnSubscribe; 10 11 public final class OnSubscribeInputStream extends SyncOnSubscribe<InputStream, byte[]> { 12 13 private final InputStream is; 14 private final int size; 15 16 public OnSubscribeInputStream(InputStream is, int size) { 17 this.is = is; 18 this.size = size; 19 } 20 21 @Override 22 protected InputStream generateState() { 23 return is; 24 } 25 26 @Override 27 protected InputStream next(InputStream is, Observer<? super byte[]> observer) { 28 byte[] buffer = new byte[size]; 29 try { 30 int count = is.read(buffer); 31 if (count == -1) 32 observer.onCompleted(); 33 else if (count < size) 34 observer.onNext(Arrays.copyOf(buffer, count)); 35 else 36 observer.onNext(buffer); 37 } catch (IOException e) { 38 observer.onError(e); 39 } 40 return is; 41 } 42 43 public Observable<byte[]> toObservable() { 44 return Observable.create(this); 45 } 46 }