1 package au.gov.amsa.util.rx;
2
3 import java.io.BufferedOutputStream;
4 import java.io.File;
5 import java.io.FileOutputStream;
6 import java.io.IOException;
7 import java.io.OutputStream;
8
9 import rx.Observable.Operator;
10 import rx.Observer;
11 import rx.Subscriber;
12 import rx.observers.Subscribers;
13
14 public class OperatorWriteBytes implements Operator<String, byte[]> {
15
16 private static final int DEFAULT_BUFFER_SIZE = 8192;
17 private final OutputStream out;
18 private final boolean closeOnTerminate;
19 private final File file;
20 private final boolean append;
21 private final int bufferSize;
22 private final boolean createTempFile;
23
24 private OperatorWriteBytes(File file, boolean createTempFile,
25 boolean append, OutputStream out, boolean closeOnTerminate,
26 int bufferSize) {
27 this.file = file;
28 this.createTempFile = createTempFile;
29 this.append = append;
30 this.out = out;
31 this.closeOnTerminate = closeOnTerminate;
32 this.bufferSize = bufferSize;
33 }
34
35 public OperatorWriteBytes(File file, boolean append) {
36 this(file, false, append, null, true, DEFAULT_BUFFER_SIZE);
37 }
38
39 public OperatorWriteBytes(File file, boolean append, int bufferSize) {
40 this(file, false, append, null, true, bufferSize);
41 }
42
43 public OperatorWriteBytes() {
44 this(null, true, true, null, true, DEFAULT_BUFFER_SIZE);
45 }
46
47 public OperatorWriteBytes(int bufferSize) {
48 this(null, true, true, null, true, bufferSize);
49 }
50
51 public OperatorWriteBytes(OutputStream out, boolean closeOnTerminate) {
52 this(null, false, true, out, closeOnTerminate, DEFAULT_BUFFER_SIZE);
53 }
54
55 public OperatorWriteBytes(OutputStream out, boolean closeOnTerminate,
56 int bufferSize) {
57 this(null, false, true, out, closeOnTerminate, DEFAULT_BUFFER_SIZE);
58 }
59
60 @SuppressWarnings("resource")
61 @Override
62 public Subscriber<? super byte[]> call(
63 final Subscriber<? super String> child) {
64
65
66 final OutputStream o;
67 final File actualFile;
68 if (file != null || createTempFile)
69 try {
70 if (createTempFile)
71 actualFile = File.createTempFile(
72 OperatorWriteBytes.class.getName(), ".bin");
73 else
74 actualFile = file;
75 o = new FileOutputStream(actualFile, append);
76 } catch (IOException e) {
77 child.onError(e);
78 return Subscribers.empty();
79 }
80 else {
81 o = out;
82 actualFile = null;
83 }
84 final OutputStream os = new BufferedOutputStream(o, bufferSize);
85 final Subscriber<byte[]> parent = Subscribers
86 .from(new Observer<byte[]>() {
87
88 @Override
89 public void onCompleted() {
90 if (closeOnTerminate)
91 try {
92 os.close();
93 if (actualFile != null)
94 child.onNext(actualFile.getPath());
95 child.onCompleted();
96 } catch (IOException e) {
97 child.onError(e);
98 }
99 else
100 child.onCompleted();
101 }
102
103 @Override
104 public void onError(Throwable e) {
105 if (closeOnTerminate)
106 try {
107 os.close();
108 child.onError(e);
109 } catch (IOException e2) {
110 child.onError(new CompositeException(e, e2));
111 }
112 else
113 child.onError(e);
114 }
115
116 @Override
117 public void onNext(byte[] bytes) {
118 try {
119 os.write(bytes);
120 } catch (IOException e) {
121 child.onError(e);
122 }
123 }
124 });
125 child.add(parent);
126 return parent;
127 }
128
129 }