View Javadoc
1   package au.gov.amsa.streams;
2   
3   import java.io.IOException;
4   import java.io.Reader;
5   
6   import rx.Observable;
7   import rx.Observer;
8   import rx.observables.SyncOnSubscribe;
9   
10  public final class OnSubscribeReader extends SyncOnSubscribe<Reader, String> {
11  
12      private final Reader reader;
13      
14      private final char[] buffer;
15  
16      public OnSubscribeReader(Reader reader, int size) {
17          this.reader = reader;
18          this.buffer = new char[size];
19      }
20  
21      @Override
22      protected Reader generateState() {
23          return reader;
24      }
25  
26      @Override
27      protected Reader next(Reader reader, Observer<? super String> observer) {
28          try {
29              int count = reader.read(buffer);
30              if (count == -1)
31                  observer.onCompleted();
32              else
33                  observer.onNext(String.valueOf(buffer, 0, count));
34          } catch (IOException e) {
35              observer.onError(e);
36          }
37          return reader;
38      }
39  
40      public Observable<String> toObservable() {
41          return Observable.create(this);
42      }
43  }