1 package au.gov.amsa.streams;
2
3 import java.io.IOException;
4 import java.io.InputStreamReader;
5 import java.net.Socket;
6 import java.nio.charset.Charset;
7 import java.nio.charset.StandardCharsets;
8 import java.util.concurrent.TimeUnit;
9
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12
13 import com.github.davidmoten.rx.Checked;
14 import com.github.davidmoten.rx.RetryWhen;
15 import com.github.davidmoten.util.Preconditions;
16 import com.google.common.annotations.VisibleForTesting;
17
18 import rx.Observable;
19 import rx.Scheduler;
20 import rx.functions.Action1;
21 import rx.functions.Func0;
22 import rx.functions.Func1;
23 import rx.schedulers.Schedulers;
24
25 public final class StringSockets {
26
27 private static final Logger log = LoggerFactory.getLogger(StringSockets.class);
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42 public static Observable<String> from(final String host, final int port, long quietTimeoutMs,
43 long reconnectDelayMs, Charset charset, Scheduler scheduler) {
44
45
46
47 return from(socketCreator(host, port, (int) quietTimeoutMs), quietTimeoutMs,
48 reconnectDelayMs, charset, scheduler);
49 }
50
51 public static Observable<String> from(Func0<Socket> socketCreator, long quietTimeoutMs,
52 long reconnectDelayMs, Charset charset, Scheduler scheduler) {
53 return strings(socketCreator, charset)
54
55
56 .timeout(quietTimeoutMs + 100, TimeUnit.MILLISECONDS)
57 .subscribeOn(scheduler)
58
59 .retryWhen(RetryWhen
60 .delay(reconnectDelayMs, TimeUnit.MILLISECONDS)
61 .build())
62
63 .share();
64 }
65
66 public static class Builder {
67 private final String host;
68 private int port = 6564;
69 private long quietTimeoutMs = 60000;
70 private long reconnectDelayMs = 30000;
71 private Charset charset = StandardCharsets.UTF_8;
72 private Scheduler scheduler = Schedulers.io();
73
74 Builder(String host) {
75 this.host = host;
76 }
77
78 public Builder port(int port) {
79 this.port = port;
80 return this;
81 }
82
83 public Builder quietTimeoutMs(long quietTimeoutMs) {
84 this.quietTimeoutMs = quietTimeoutMs;
85 return this;
86 }
87
88 public Builder quietTimeout(long duration, TimeUnit unit) {
89 return quietTimeoutMs(unit.toMillis(duration));
90 }
91
92 public Builder reconnectDelayMs(long reconnectDelayMs) {
93 this.reconnectDelayMs = reconnectDelayMs;
94 return this;
95 }
96
97 public Builder reconnectDelay(long duration, TimeUnit unit) {
98 return reconnectDelayMs(unit.toMillis(duration));
99 }
100
101 public Builder charset(Charset charset) {
102 this.charset = charset;
103 return this;
104 }
105
106 public Builder subscribeOn(Scheduler scheduler) {
107 this.scheduler = scheduler;
108 return this;
109 }
110
111 public Observable<String> create() {
112 return from(host, port, quietTimeoutMs, reconnectDelayMs, charset, scheduler);
113 }
114
115 }
116
117
118
119
120
121
122
123
124
125 public static Builder from(String host) {
126 return new Builder(host);
127 }
128
129 public static Observable<String> strings(final String host, final int port, int quietTimeoutMs,
130 final Charset charset) {
131 Preconditions.checkNotNull(host);
132 Preconditions.checkArgument(port >= 0 && port <= 65535, "port must be between 0 and 65535");
133 Preconditions.checkArgument(quietTimeoutMs > 0, "quietTimeoutMs must be > 0");
134 Preconditions.checkNotNull(charset);
135 return strings(socketCreator(host, port, quietTimeoutMs), charset);
136 }
137
138 public static Observable<String> strings(Func0<Socket> socketCreator, final Charset charset) {
139 Preconditions.checkNotNull(socketCreator);
140 Preconditions.checkNotNull(charset);
141 return Observable
142
143
144 .using(socketCreator, socketObservableFactory(charset), socketDisposer(), true);
145 }
146
147 @VisibleForTesting
148 static Func0<Socket> socketCreator(final String host, final int port, long quietTimeoutMs) {
149 return Checked.f0(() -> {
150 Socket socket = new Socket(host, port);
151 socket.setSoTimeout((int) quietTimeoutMs);
152 return socket;
153 });
154 }
155
156 @VisibleForTesting
157 static Func1<Socket, Observable<String>> socketObservableFactory(final Charset charset) {
158 return Checked.f1(
159 socket -> Strings.from(new InputStreamReader(socket.getInputStream(), charset)));
160 }
161
162 @VisibleForTesting
163 static Action1<Socket> socketDisposer() {
164 return socket -> {
165 try {
166 log.info("closing socket " + socket.getInetAddress().getHostAddress() + ":"
167 + socket.getPort());
168 socket.close();
169 } catch (IOException e) {
170
171 log.info("messageOnSocketClose=" + e.getMessage(), e);
172 }
173 };
174 }
175
176 public static Observable<String> mergeLinesFrom(Observable<HostPort> hostPorts,
177 Scheduler scheduler) {
178 return hostPorts
179
180 .map(hp -> StringSockets
181 .from(hp.getHost(), hp.getPort(), hp.getQuietTimeoutMs(),
182 hp.getReconnectDelayMs(), StandardCharsets.UTF_8, scheduler)
183
184 .compose(o -> Strings.split(o, "\n")))
185
186 .compose(o -> Observable.merge(o));
187 }
188
189 public static Observable<String> mergeLinesFrom(Observable<HostPort> hostPorts, int maxLineLength) {
190 return mergeLinesFrom(hostPorts, Schedulers.io(), maxLineLength);
191 }
192
193 public static Observable<String> mergeLinesFrom(Observable<HostPort> hostPorts,
194 Scheduler scheduler, int maxLineLength) {
195 return hostPorts
196
197 .map(hp -> StringSockets
198 .from(hp.getHost(), hp.getPort(), hp.getQuietTimeoutMs(),
199 hp.getReconnectDelayMs(), StandardCharsets.UTF_8, scheduler)
200
201 .compose(o -> Strings.split(o, maxLineLength, "\n", 1)))
202
203 .compose(o -> Observable.merge(o));
204 }
205
206 public static Observable<String> mergeLinesFrom(Observable<HostPort> hostPorts) {
207 return mergeLinesFrom(hostPorts, Schedulers.io());
208 }
209
210 public static void main(String[] args) {
211 Observable<HostPort> hostPorts = Observable.just(
212 HostPort.create("sarapps", 9010, 1000, 1000),
213 HostPort.create("sarapps", 9100, 1000, 1000));
214 StringSockets.mergeLinesFrom(hostPorts, Schedulers.io()).doOnNext(System.out::println)
215 .toBlocking().last();
216 }
217 }