1 package au.gov.amsa.streams;
2
3 import java.io.IOException;
4 import java.io.Reader;
5
6 import rx.Observable;
7 import rx.Observer;
8 import rx.observables.SyncOnSubscribe;
9
10 public final class OnSubscribeReader extends SyncOnSubscribe<Reader, String> {
11
12 private final Reader reader;
13
14 private final char[] buffer;
15
16 public OnSubscribeReader(Reader reader, int size) {
17 this.reader = reader;
18 this.buffer = new char[size];
19 }
20
21 @Override
22 protected Reader generateState() {
23 return reader;
24 }
25
26 @Override
27 protected Reader next(Reader reader, Observer<? super String> observer) {
28 try {
29 int count = reader.read(buffer);
30 if (count == -1)
31 observer.onCompleted();
32 else
33 observer.onNext(String.valueOf(buffer, 0, count));
34 } catch (IOException e) {
35 observer.onError(e);
36 }
37 return reader;
38 }
39
40 public Observable<String> toObservable() {
41 return Observable.create(this);
42 }
43 }