View Javadoc
1   package au.gov.amsa.streams;
2   
3   import java.io.IOException;
4   import java.io.OutputStream;
5   import java.net.ServerSocket;
6   import java.net.Socket;
7   import java.net.SocketTimeoutException;
8   import java.nio.charset.StandardCharsets;
9   
10  import org.slf4j.Logger;
11  import org.slf4j.LoggerFactory;
12  
13  import rx.Observable;
14  import rx.Subscriber;
15  import rx.internal.util.SubscriptionList;
16  import rx.schedulers.Schedulers;
17  import rx.subscriptions.Subscriptions;
18  
19  /**
20   * Publishes lines from an Observable<String> source to a
21   * {@link ServerSocket}.
22   */
23  public final class StringServer {
24  
25      private static Logger log = LoggerFactory.getLogger(StringServer.class);
26  
27      private final ServerSocket ss;
28      private volatile boolean keepGoing = true;
29      private final Observable<String> source;
30      private final SubscriptionList subscriptions = new SubscriptionList();
31  
32      /**
33       * Factory method.
34       * 
35       * @param source
36       *            source to publish on server socket
37       * @param port
38       *            to assign the server socket to
39       */
40      public static StringServer create(Observable<String> source, int port) {
41          return new StringServer(source, port);
42      }
43  
44      /**
45       * Constructor.
46       * 
47       * @param ss
48       *            {@link ServerSocket} to publish to
49       * @param source
50       *            the source of lines to publish on ServerSocket
51       */
52      private StringServer(Observable<String> source, int port) {
53          try {
54              this.ss = new ServerSocket(port);
55              subscriptions.add(Subscriptions.create(() -> closeServerSocket()));
56          } catch (IOException e) {
57              throw new RuntimeException(e);
58          }
59          this.source = source;
60      }
61  
62      /**
63       * Starts the server. Each connection to the server will bring about another
64       * subscription to the source.
65       */
66      public void start() {
67          try {
68              while (keepGoing) {
69                  try {
70                      // this is a blocking call so it hogs a thread
71                      final Socket socket = ss.accept();
72                      final String socketName = socket.getInetAddress().getHostAddress() + ":"
73                              + socket.getPort();
74                      log.info("accepted socket connection from " + socketName);
75                      try {
76                          final OutputStream out = socket.getOutputStream();
77  
78                          Subscriber<String> subscriber = createSubscriber(socket, socketName, out);
79                          subscriptions.add(subscriber);
80                          source.subscribeOn(Schedulers.io())
81                                  // remove subscriber from subscriptions on unsub
82                                  .doOnUnsubscribe(() -> subscriptions.remove(subscriber))
83                                  // write each line to the socket OutputStream
84                                  .subscribe(subscriber);
85  
86                      } catch (IOException e) {
87                          // could not get output stream (could have closed very
88                          // quickly after connecting)
89                          // dont' care
90                          log.warn(e.getClass().getSimpleName() + ": " + e.getMessage());
91                      }
92                  } catch (SocketTimeoutException e) {
93                      // don't care
94                      log.warn(e.getClass().getSimpleName() + ": " + e.getMessage());
95                  }
96              }
97          } catch (IOException e) {
98              if (keepGoing) {
99                  log.warn(e.getMessage(), e);
100                 throw new RuntimeException(e);
101             } else
102                 log.info("server stopped");
103         } finally {
104             closeServerSocket();
105         }
106     }
107 
108     /**
109      * Stops the server by closing the ServerSocket.
110      */
111     public void stop() {
112         keepGoing = false;
113         subscriptions.unsubscribe();
114     }
115 
116     private void closeServerSocket() {
117         log.info("stopping string server socket on port " + ss.getLocalPort());
118         try {
119             ss.close();
120         } catch (IOException e) {
121             log.info("could not close server socket: " + e.getMessage());
122         }
123     }
124 
125     private static Subscriber<String> createSubscriber(final Socket socket, final String socketName,
126             final OutputStream out) {
127         return new Subscriber<String>() {
128 
129             @Override
130             public void onCompleted() {
131                 log.info("stream completed");
132                 closeSocket();
133             }
134 
135             @Override
136             public void onError(Throwable e) {
137                 log.error(e.getMessage() + " - unexpected due to upstream retry");
138                 closeSocket();
139             }
140 
141             @Override
142             public void onNext(String line) {
143                 try {
144                     out.write(line.getBytes(StandardCharsets.UTF_8));
145                     out.flush();
146                 } catch (IOException e) {
147                     log.info(e.getMessage() + " " + socketName);
148                     // this will unsubscribe to clean up the
149                     // resources associated with this subscription
150                     unsubscribe();
151                     closeSocket();
152                 }
153             }
154 
155             private void closeSocket() {
156                 try {
157                     socket.close();
158                 } catch (IOException e1) {
159                     log.info("closing socket " + socketName + ":" + e1.getMessage());
160                 }
161             }
162         };
163     }
164 
165     @Override
166     public String toString() {
167         StringBuilder b = new StringBuilder();
168         b.append("StringServer [port=");
169         b.append(ss.getLocalPort());
170         b.append("]");
171         return b.toString();
172     }
173 
174 }