1 package au.gov.amsa.streams;
2
3 import java.io.IOException;
4 import java.io.OutputStream;
5 import java.net.ServerSocket;
6 import java.net.Socket;
7 import java.net.SocketTimeoutException;
8 import java.nio.charset.StandardCharsets;
9
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12
13 import rx.Observable;
14 import rx.Subscriber;
15 import rx.internal.util.SubscriptionList;
16 import rx.schedulers.Schedulers;
17 import rx.subscriptions.Subscriptions;
18
19
20
21
22
23 public final class StringServer {
24
25 private static Logger log = LoggerFactory.getLogger(StringServer.class);
26
27 private final ServerSocket ss;
28 private volatile boolean keepGoing = true;
29 private final Observable<String> source;
30 private final SubscriptionList subscriptions = new SubscriptionList();
31
32
33
34
35
36
37
38
39
40 public static StringServer create(Observable<String> source, int port) {
41 return new StringServer(source, port);
42 }
43
44
45
46
47
48
49
50
51
52 private StringServer(Observable<String> source, int port) {
53 try {
54 this.ss = new ServerSocket(port);
55 subscriptions.add(Subscriptions.create(() -> closeServerSocket()));
56 } catch (IOException e) {
57 throw new RuntimeException(e);
58 }
59 this.source = source;
60 }
61
62
63
64
65
66 public void start() {
67 try {
68 while (keepGoing) {
69 try {
70
71 final Socket socket = ss.accept();
72 final String socketName = socket.getInetAddress().getHostAddress() + ":"
73 + socket.getPort();
74 log.info("accepted socket connection from " + socketName);
75 try {
76 final OutputStream out = socket.getOutputStream();
77
78 Subscriber<String> subscriber = createSubscriber(socket, socketName, out);
79 subscriptions.add(subscriber);
80 source.subscribeOn(Schedulers.io())
81
82 .doOnUnsubscribe(() -> subscriptions.remove(subscriber))
83
84 .subscribe(subscriber);
85
86 } catch (IOException e) {
87
88
89
90 log.warn(e.getClass().getSimpleName() + ": " + e.getMessage());
91 }
92 } catch (SocketTimeoutException e) {
93
94 log.warn(e.getClass().getSimpleName() + ": " + e.getMessage());
95 }
96 }
97 } catch (IOException e) {
98 if (keepGoing) {
99 log.warn(e.getMessage(), e);
100 throw new RuntimeException(e);
101 } else
102 log.info("server stopped");
103 } finally {
104 closeServerSocket();
105 }
106 }
107
108
109
110
111 public void stop() {
112 keepGoing = false;
113 subscriptions.unsubscribe();
114 }
115
116 private void closeServerSocket() {
117 log.info("stopping string server socket on port " + ss.getLocalPort());
118 try {
119 ss.close();
120 } catch (IOException e) {
121 log.info("could not close server socket: " + e.getMessage());
122 }
123 }
124
125 private static Subscriber<String> createSubscriber(final Socket socket, final String socketName,
126 final OutputStream out) {
127 return new Subscriber<String>() {
128
129 @Override
130 public void onCompleted() {
131 log.info("stream completed");
132 closeSocket();
133 }
134
135 @Override
136 public void onError(Throwable e) {
137 log.error(e.getMessage() + " - unexpected due to upstream retry");
138 closeSocket();
139 }
140
141 @Override
142 public void onNext(String line) {
143 try {
144 out.write(line.getBytes(StandardCharsets.UTF_8));
145 out.flush();
146 } catch (IOException e) {
147 log.info(e.getMessage() + " " + socketName);
148
149
150 unsubscribe();
151 closeSocket();
152 }
153 }
154
155 private void closeSocket() {
156 try {
157 socket.close();
158 } catch (IOException e1) {
159 log.info("closing socket " + socketName + ":" + e1.getMessage());
160 }
161 }
162 };
163 }
164
165 @Override
166 public String toString() {
167 StringBuilder b = new StringBuilder();
168 b.append("StringServer [port=");
169 b.append(ss.getLocalPort());
170 b.append("]");
171 return b.toString();
172 }
173
174 }