View Javadoc
1   package au.gov.amsa.streams;
2   
3   import java.io.IOException;
4   import java.io.InputStreamReader;
5   import java.net.Socket;
6   import java.nio.charset.Charset;
7   import java.nio.charset.StandardCharsets;
8   import java.util.concurrent.TimeUnit;
9   
10  import org.slf4j.Logger;
11  import org.slf4j.LoggerFactory;
12  
13  import com.github.davidmoten.rx.Checked;
14  import com.github.davidmoten.rx.RetryWhen;
15  import com.github.davidmoten.util.Preconditions;
16  import com.google.common.annotations.VisibleForTesting;
17  
18  import rx.Observable;
19  import rx.Scheduler;
20  import rx.functions.Action1;
21  import rx.functions.Func0;
22  import rx.functions.Func1;
23  import rx.schedulers.Schedulers;
24  
25  public final class StringSockets {
26  
27      private static final Logger log = LoggerFactory.getLogger(StringSockets.class);
28  
29      /**
30       * Returns an Observable sequence of strings (not lines) from the given host
31       * and port. If the stream is quiet for <code>quietTimeoutMs</code> then a
32       * reconnect will be attempted. Note that this is a good idea with TCPIP
33       * connections as for instance a firewall can simply drop a quiet connection
34       * without the client being aware of it. If any exception occurs a reconnect
35       * will be attempted after <code>reconnectDelayMs</code>. If the socket is
36       * closed by the server (the end of the input stream is reached) then a
37       * reconnect is attempted after <code>reconnectDelayMs</code>.
38       * 
39       * @param hostPort
40       * @return Observable sequence of strings (not lines).
41       */
42      public static Observable<String> from(final String host, final int port, long quietTimeoutMs,
43              long reconnectDelayMs, Charset charset, Scheduler scheduler) {
44          // delay connect by delayMs so that if server closes
45          // stream on every connect we won't be in a mad loop of
46          // failing connections
47          return from(socketCreator(host, port, (int) quietTimeoutMs), quietTimeoutMs,
48                  reconnectDelayMs, charset, scheduler); //
49      }
50  
51      public static Observable<String> from(Func0<Socket> socketCreator, long quietTimeoutMs,
52              long reconnectDelayMs, Charset charset, Scheduler scheduler) {
53          return strings(socketCreator, charset) //
54                  // additional timeout appears to be necessary for certain use
55                  // cases like when the server side does not close the socket
56                  .timeout(quietTimeoutMs + 100, TimeUnit.MILLISECONDS) //
57                  .subscribeOn(scheduler) //
58                  // if any exception occurs retry
59                  .retryWhen(RetryWhen //
60                          .delay(reconnectDelayMs, TimeUnit.MILLISECONDS) //
61                          .build()) //
62                  // all subscribers use the same stream
63                  .share();
64      }
65  
66      public static class Builder {
67          private final String host;
68          private int port = 6564;
69          private long quietTimeoutMs = 60000;
70          private long reconnectDelayMs = 30000;
71          private Charset charset = StandardCharsets.UTF_8;
72          private Scheduler scheduler = Schedulers.io();
73  
74          Builder(String host) {
75              this.host = host;
76          }
77  
78          public Builder port(int port) {
79              this.port = port;
80              return this;
81          }
82  
83          public Builder quietTimeoutMs(long quietTimeoutMs) {
84              this.quietTimeoutMs = quietTimeoutMs;
85              return this;
86          }
87  
88          public Builder quietTimeout(long duration, TimeUnit unit) {
89              return quietTimeoutMs(unit.toMillis(duration));
90          }
91  
92          public Builder reconnectDelayMs(long reconnectDelayMs) {
93              this.reconnectDelayMs = reconnectDelayMs;
94              return this;
95          }
96  
97          public Builder reconnectDelay(long duration, TimeUnit unit) {
98              return reconnectDelayMs(unit.toMillis(duration));
99          }
100 
101         public Builder charset(Charset charset) {
102             this.charset = charset;
103             return this;
104         }
105 
106         public Builder subscribeOn(Scheduler scheduler) {
107             this.scheduler = scheduler;
108             return this;
109         }
110 
111         public Observable<String> create() {
112             return from(host, port, quietTimeoutMs, reconnectDelayMs, charset, scheduler);
113         }
114 
115     }
116 
117     /**
118      * Returns a builder for converting a socket read to an Observable. Defaults
119      * to port=6564, quietTimeoutMs=60000, reconnectDelayMs=30000, charset=
120      * UTF-8.
121      * 
122      * @param host
123      * @return
124      */
125     public static Builder from(String host) {
126         return new Builder(host);
127     }
128 
129     public static Observable<String> strings(final String host, final int port, int quietTimeoutMs,
130             final Charset charset) {
131         Preconditions.checkNotNull(host);
132         Preconditions.checkArgument(port >= 0 && port <= 65535, "port must be between 0 and 65535");
133         Preconditions.checkArgument(quietTimeoutMs > 0, "quietTimeoutMs must be > 0");
134         Preconditions.checkNotNull(charset);
135         return strings(socketCreator(host, port, quietTimeoutMs), charset);
136     }
137 
138     public static Observable<String> strings(Func0<Socket> socketCreator, final Charset charset) {
139         Preconditions.checkNotNull(socketCreator);
140         Preconditions.checkNotNull(charset);
141         return Observable
142                 // create a stream from a socket and dispose of socket
143                 // appropriately
144                 .using(socketCreator, socketObservableFactory(charset), socketDisposer(), true);
145     }
146 
147     @VisibleForTesting
148     static Func0<Socket> socketCreator(final String host, final int port, long quietTimeoutMs) {
149         return Checked.f0(() -> {
150             Socket socket = new Socket(host, port);
151             socket.setSoTimeout((int) quietTimeoutMs);
152             return socket;
153         });
154     }
155 
156     @VisibleForTesting
157     static Func1<Socket, Observable<String>> socketObservableFactory(final Charset charset) {
158         return Checked.f1(
159                 socket -> Strings.from(new InputStreamReader(socket.getInputStream(), charset)));
160     }
161 
162     @VisibleForTesting
163     static Action1<Socket> socketDisposer() {
164         return socket -> {
165             try {
166                 log.info("closing socket " + socket.getInetAddress().getHostAddress() + ":"
167                         + socket.getPort());
168                 socket.close();
169             } catch (IOException e) {
170                 // don't really care if socket could not be closed cleanly
171                 log.info("messageOnSocketClose=" + e.getMessage(), e);
172             }
173         };
174     }
175 
176     public static Observable<String> mergeLinesFrom(Observable<HostPort> hostPorts,
177             Scheduler scheduler) {
178         return hostPorts
179                 //
180                 .map(hp -> StringSockets
181                         .from(hp.getHost(), hp.getPort(), hp.getQuietTimeoutMs(),
182                                 hp.getReconnectDelayMs(), StandardCharsets.UTF_8, scheduler)
183                         // split by new line character
184                         .compose(o -> Strings.split(o, "\n")))
185                 // merge streams of lines
186                 .compose(o -> Observable.merge(o));
187     }
188 
189     public static Observable<String> mergeLinesFrom(Observable<HostPort> hostPorts, int maxLineLength) {
190         return mergeLinesFrom(hostPorts, Schedulers.io(), maxLineLength);
191     }
192     
193     public static Observable<String> mergeLinesFrom(Observable<HostPort> hostPorts,
194             Scheduler scheduler, int maxLineLength) {
195         return hostPorts
196                 //
197                 .map(hp -> StringSockets
198                         .from(hp.getHost(), hp.getPort(), hp.getQuietTimeoutMs(),
199                                 hp.getReconnectDelayMs(), StandardCharsets.UTF_8, scheduler)
200                         // split by new line character
201                         .compose(o -> Strings.split(o, maxLineLength,  "\n", 1)))
202                 // merge streams of lines
203                 .compose(o -> Observable.merge(o));
204     }
205 
206     public static Observable<String> mergeLinesFrom(Observable<HostPort> hostPorts) {
207         return mergeLinesFrom(hostPorts, Schedulers.io());
208     }
209 
210     public static void main(String[] args) {
211         Observable<HostPort> hostPorts = Observable.just(
212                 HostPort.create("sarapps", 9010, 1000, 1000),
213                 HostPort.create("sarapps", 9100, 1000, 1000));
214         StringSockets.mergeLinesFrom(hostPorts, Schedulers.io()).doOnNext(System.out::println)
215                 .toBlocking().last();
216     }
217 }