1 package au.gov.amsa.streams;
2
3 import java.io.File;
4 import java.io.FileInputStream;
5 import java.io.FileNotFoundException;
6 import java.io.IOException;
7 import java.io.InputStream;
8
9 import rx.Observable;
10 import rx.functions.Action1;
11 import rx.functions.Func0;
12 import rx.functions.Func1;
13
14 public class Bytes {
15
16 public static Observable<byte[]> from(final InputStream is, final int size) {
17 return new OnSubscribeInputStream(is, size).toObservable();
18 }
19
20 public static Observable<byte[]> from(InputStream is) {
21 return from(is, 8192);
22 }
23
24 public static Observable<byte[]> from(final File file) {
25 Func0<InputStream> resourceFactory = new Func0<InputStream>() {
26 @Override
27 public InputStream call() {
28 try {
29 return new FileInputStream(file);
30 } catch (FileNotFoundException e) {
31 throw new RuntimeException(e);
32 }
33 }
34 };
35 Func1<InputStream, Observable<byte[]>> observableFactory = new Func1<InputStream, Observable<byte[]>>() {
36 @Override
37 public Observable<byte[]> call(InputStream is) {
38 return from(is);
39 }
40 };
41 Action1<InputStream> disposeAction = new Action1<InputStream>() {
42 @Override
43 public void call(InputStream is) {
44 try {
45 is.close();
46 } catch (IOException e) {
47 e.printStackTrace();
48 }
49 }
50 };
51 return Observable.using(resourceFactory, observableFactory,
52 disposeAction, true);
53 }
54 }