View Javadoc
1   package au.gov.amsa.streams;
2   
3   import java.io.File;
4   import java.io.FileInputStream;
5   import java.io.FileNotFoundException;
6   import java.io.IOException;
7   import java.io.InputStream;
8   
9   import rx.Observable;
10  import rx.functions.Action1;
11  import rx.functions.Func0;
12  import rx.functions.Func1;
13  
14  public class Bytes {
15  
16  	public static Observable<byte[]> from(final InputStream is, final int size) {
17  		return new OnSubscribeInputStream(is, size).toObservable();
18  	}
19  
20  	public static Observable<byte[]> from(InputStream is) {
21  		return from(is, 8192);
22  	}
23  
24  	public static Observable<byte[]> from(final File file) {
25  		Func0<InputStream> resourceFactory = new Func0<InputStream>() {
26  			@Override
27  			public InputStream call() {
28  				try {
29  					return new FileInputStream(file);
30  				} catch (FileNotFoundException e) {
31  					throw new RuntimeException(e);
32  				}
33  			}
34  		};
35  		Func1<InputStream, Observable<byte[]>> observableFactory = new Func1<InputStream, Observable<byte[]>>() {
36  			@Override
37  			public Observable<byte[]> call(InputStream is) {
38  				return from(is);
39  			}
40  		};
41  		Action1<InputStream> disposeAction = new Action1<InputStream>() {
42  			@Override
43  			public void call(InputStream is) {
44  				try {
45  					is.close();
46  				} catch (IOException e) {
47  					e.printStackTrace();
48  				}
49  			}
50  		};
51  		return Observable.using(resourceFactory, observableFactory,
52  				disposeAction, true);
53  	}
54  }