1 package au.gov.amsa.streams; 2 3 import java.io.IOException; 4 import java.io.Reader; 5 6 import rx.Observable; 7 import rx.Observer; 8 import rx.observables.SyncOnSubscribe; 9 10 public final class OnSubscribeReader extends SyncOnSubscribe<Reader, String> { 11 12 private final Reader reader; 13 14 private final char[] buffer; 15 16 public OnSubscribeReader(Reader reader, int size) { 17 this.reader = reader; 18 this.buffer = new char[size]; 19 } 20 21 @Override 22 protected Reader generateState() { 23 return reader; 24 } 25 26 @Override 27 protected Reader next(Reader reader, Observer<? super String> observer) { 28 try { 29 int count = reader.read(buffer); 30 if (count == -1) 31 observer.onCompleted(); 32 else 33 observer.onNext(String.valueOf(buffer, 0, count)); 34 } catch (IOException e) { 35 observer.onError(e); 36 } 37 return reader; 38 } 39 40 public Observable<String> toObservable() { 41 return Observable.create(this); 42 } 43 }