1 package au.gov.amsa.ais.rx;
2
3 import java.io.BufferedReader;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.io.InputStreamReader;
7 import java.net.Socket;
8 import java.net.UnknownHostException;
9 import java.util.concurrent.atomic.AtomicBoolean;
10 import java.util.concurrent.atomic.AtomicReference;
11
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14
15 import rx.Observer;
16
17 public class SocketReaderRunnable implements Runnable {
18
19 private static Logger log = LoggerFactory.getLogger(SocketReaderRunnable.class);
20
21 private final AtomicBoolean keepGoing = new AtomicBoolean(true);
22 private final AtomicReference<Socket> socket = new AtomicReference<Socket>(
23 null);
24 private final Observer<? super String> observer;
25
26 private final AtomicReference<BufferedReader> reader = new AtomicReference<BufferedReader>();
27 private final Object lock = new Object();
28
29 private final HostPort hostPort;
30
31 public SocketReaderRunnable(HostPort hostPort,
32 Observer<? super String> observer) {
33 this.hostPort = hostPort;
34 this.observer = observer;
35 }
36
37 @Override
38 public void run() {
39 try {
40 log.info("creating new socket");
41 synchronized (lock) {
42 socket.set(createSocket(hostPort.getHost(), hostPort.getPort()));
43 }
44 log.info("waiting one second before attempting connect");
45 Thread.sleep(1000);
46 InputStream is = socket.get().getInputStream();
47 BufferedReader br;
48 synchronized (lock) {
49 br = new BufferedReader(new InputStreamReader(is));
50 reader.set(br);
51 }
52 while (keepGoing.get()) {
53 final String line;
54 synchronized (lock) {
55 if (keepGoing.get())
56 line = br.readLine();
57 else
58 line = null;
59 }
60 if (line != null)
61 try {
62 observer.onNext(line);
63 } catch (RuntimeException e) {
64 log.warn(e.getMessage(), e);
65 }
66 else
67 keepGoing.set(false);
68 }
69 observer.onCompleted();
70 log.info("completed");
71 } catch (Exception e) {
72 log.warn(e.getMessage(), e);
73 observer.onError(e);
74 }
75 }
76
77 public void cancel() {
78 log.info("cancelling socket read");
79 synchronized (lock) {
80 keepGoing.set(false);
81
82
83
84 if (socket.get() != null) {
85 if (reader.get() != null)
86 try {
87 reader.get().close();
88 } catch (IOException e) {
89
90 }
91 try {
92 socket.get().close();
93 socket.set(null);
94 } catch (IOException e) {
95
96 }
97 }
98 }
99 }
100
101 public boolean isCancelled() {
102 return keepGoing.get();
103 }
104
105 private static Socket createSocket(final String host, final int port) {
106 try {
107 return new Socket(host, port);
108 } catch (UnknownHostException e) {
109 throw new RuntimeException(e);
110 } catch (IOException e) {
111 throw new RuntimeException(e);
112 }
113 }
114
115 }