1 package au.gov.amsa.streams; 2 3 import java.io.File; 4 import java.io.FileInputStream; 5 import java.io.FileNotFoundException; 6 import java.io.IOException; 7 import java.io.InputStream; 8 9 import rx.Observable; 10 import rx.functions.Action1; 11 import rx.functions.Func0; 12 import rx.functions.Func1; 13 14 public class Bytes { 15 16 public static Observable<byte[]> from(final InputStream is, final int size) { 17 return new OnSubscribeInputStream(is, size).toObservable(); 18 } 19 20 public static Observable<byte[]> from(InputStream is) { 21 return from(is, 8192); 22 } 23 24 public static Observable<byte[]> from(final File file) { 25 Func0<InputStream> resourceFactory = new Func0<InputStream>() { 26 @Override 27 public InputStream call() { 28 try { 29 return new FileInputStream(file); 30 } catch (FileNotFoundException e) { 31 throw new RuntimeException(e); 32 } 33 } 34 }; 35 Func1<InputStream, Observable<byte[]>> observableFactory = new Func1<InputStream, Observable<byte[]>>() { 36 @Override 37 public Observable<byte[]> call(InputStream is) { 38 return from(is); 39 } 40 }; 41 Action1<InputStream> disposeAction = new Action1<InputStream>() { 42 @Override 43 public void call(InputStream is) { 44 try { 45 is.close(); 46 } catch (IOException e) { 47 e.printStackTrace(); 48 } 49 } 50 }; 51 return Observable.using(resourceFactory, observableFactory, 52 disposeAction, true); 53 } 54 }