View Javadoc
1   package au.gov.amsa.util.rx;
2   
3   import java.io.BufferedOutputStream;
4   import java.io.File;
5   import java.io.FileOutputStream;
6   import java.io.IOException;
7   import java.io.OutputStream;
8   
9   import rx.Observable.Operator;
10  import rx.Observer;
11  import rx.Subscriber;
12  import rx.observers.Subscribers;
13  
14  public class OperatorWriteBytes implements Operator<String, byte[]> {
15  
16  	private static final int DEFAULT_BUFFER_SIZE = 8192;
17  	private final OutputStream out;
18  	private final boolean closeOnTerminate;
19  	private final File file;
20  	private final boolean append;
21  	private final int bufferSize;
22  	private final boolean createTempFile;
23  
24  	private OperatorWriteBytes(File file, boolean createTempFile,
25  			boolean append, OutputStream out, boolean closeOnTerminate,
26  			int bufferSize) {
27  		this.file = file;
28  		this.createTempFile = createTempFile;
29  		this.append = append;
30  		this.out = out;
31  		this.closeOnTerminate = closeOnTerminate;
32  		this.bufferSize = bufferSize;
33  	}
34  
35  	public OperatorWriteBytes(File file, boolean append) {
36  		this(file, false, append, null, true, DEFAULT_BUFFER_SIZE);
37  	}
38  
39  	public OperatorWriteBytes(File file, boolean append, int bufferSize) {
40  		this(file, false, append, null, true, bufferSize);
41  	}
42  
43  	public OperatorWriteBytes() {
44  		this(null, true, true, null, true, DEFAULT_BUFFER_SIZE);
45  	}
46  
47  	public OperatorWriteBytes(int bufferSize) {
48  		this(null, true, true, null, true, bufferSize);
49  	}
50  
51  	public OperatorWriteBytes(OutputStream out, boolean closeOnTerminate) {
52  		this(null, false, true, out, closeOnTerminate, DEFAULT_BUFFER_SIZE);
53  	}
54  
55  	public OperatorWriteBytes(OutputStream out, boolean closeOnTerminate,
56  			int bufferSize) {
57  		this(null, false, true, out, closeOnTerminate, DEFAULT_BUFFER_SIZE);
58  	}
59  
60  	@SuppressWarnings("resource")
61  	@Override
62  	public Subscriber<? super byte[]> call(
63  			final Subscriber<? super String> child) {
64  
65  		// TODO prevent multiple active subscribers
66  		final OutputStream o;
67  		final File actualFile;
68  		if (file != null || createTempFile)
69  			try {
70  				if (createTempFile)
71  					actualFile = File.createTempFile(
72  							OperatorWriteBytes.class.getName(), ".bin");
73  				else
74  					actualFile = file;
75  				o = new FileOutputStream(actualFile, append);
76  			} catch (IOException e) {
77  				child.onError(e);
78  				return Subscribers.empty();
79  			}
80  		else {
81  			o = out;
82  			actualFile = null;
83  		}
84  		final OutputStream os = new BufferedOutputStream(o, bufferSize);
85  		final Subscriber<byte[]> parent = Subscribers
86  				.from(new Observer<byte[]>() {
87  
88  					@Override
89  					public void onCompleted() {
90  						if (closeOnTerminate)
91  							try {
92  								os.close();
93  								if (actualFile != null)
94  									child.onNext(actualFile.getPath());
95  								child.onCompleted();
96  							} catch (IOException e) {
97  								child.onError(e);
98  							}
99  						else
100 							child.onCompleted();
101 					}
102 
103 					@Override
104 					public void onError(Throwable e) {
105 						if (closeOnTerminate)
106 							try {
107 								os.close();
108 								child.onError(e);
109 							} catch (IOException e2) {
110 								child.onError(new CompositeException(e, e2));
111 							}
112 						else
113 							child.onError(e);
114 					}
115 
116 					@Override
117 					public void onNext(byte[] bytes) {
118 						try {
119 							os.write(bytes);
120 						} catch (IOException e) {
121 							child.onError(e);
122 						}
123 					}
124 				});
125 		child.add(parent);
126 		return parent;
127 	}
128 
129 }