View Javadoc
1   package au.gov.amsa.ais.rx;
2   
3   import java.io.BufferedReader;
4   import java.io.IOException;
5   import java.io.InputStream;
6   import java.io.InputStreamReader;
7   import java.net.Socket;
8   import java.net.UnknownHostException;
9   import java.util.concurrent.atomic.AtomicBoolean;
10  import java.util.concurrent.atomic.AtomicReference;
11  
12  import org.slf4j.Logger;
13  import org.slf4j.LoggerFactory;
14  
15  import rx.Observer;
16  
17  public class SocketReaderRunnable implements Runnable {
18  
19  	private static Logger log = LoggerFactory.getLogger(SocketReaderRunnable.class);
20  
21  	private final AtomicBoolean keepGoing = new AtomicBoolean(true);
22  	private final AtomicReference<Socket> socket = new AtomicReference<Socket>(
23  			null);
24  	private final Observer<? super String> observer;
25  
26  	private final AtomicReference<BufferedReader> reader = new AtomicReference<BufferedReader>();
27  	private final Object lock = new Object();
28  
29  	private final HostPort hostPort;
30  
31  	public SocketReaderRunnable(HostPort hostPort,
32  			Observer<? super String> observer) {
33  		this.hostPort = hostPort;
34  		this.observer = observer;
35  	}
36  
37  	@Override
38  	public void run() {
39  		try {
40  			log.info("creating new socket");
41  			synchronized (lock) {
42  				socket.set(createSocket(hostPort.getHost(), hostPort.getPort()));
43  			}
44  			log.info("waiting one second before attempting connect");
45  			Thread.sleep(1000);
46  			InputStream is = socket.get().getInputStream();
47  			BufferedReader br;
48  			synchronized (lock) {
49  				br = new BufferedReader(new InputStreamReader(is));
50  				reader.set(br);
51  			}
52  			while (keepGoing.get()) {
53  				final String line;
54  				synchronized (lock) {
55  					if (keepGoing.get())
56  						line = br.readLine();
57  					else
58  						line = null;
59  				}
60  				if (line != null)
61  					try {
62  						observer.onNext(line);
63  					} catch (RuntimeException e) {
64  						log.warn(e.getMessage(), e);
65  					}
66  				else
67  					keepGoing.set(false);
68  			}
69  			observer.onCompleted();
70  			log.info("completed");
71  		} catch (Exception e) {
72  			log.warn(e.getMessage(), e);
73  			observer.onError(e);
74  		}
75  	}
76  
77  	public void cancel() {
78  		log.info("cancelling socket read");
79  		synchronized (lock) {
80  			keepGoing.set(false);
81  			// only allow socket to be closed once because a fresh
82  			// instance of Socket could have been opened to the same host and
83  			// port and we don't want to mess with it.
84  			if (socket.get() != null) {
85  				if (reader.get() != null)
86  					try {
87  						reader.get().close();
88  					} catch (IOException e) {
89  						// ignore
90  					}
91  				try {
92  					socket.get().close();
93  					socket.set(null);
94  				} catch (IOException e) {
95  					// ignore
96  				}
97  			}
98  		}
99  	}
100 
101 	public boolean isCancelled() {
102 		return keepGoing.get();
103 	}
104 
105 	private static Socket createSocket(final String host, final int port) {
106 		try {
107 			return new Socket(host, port);
108 		} catch (UnknownHostException e) {
109 			throw new RuntimeException(e);
110 		} catch (IOException e) {
111 			throw new RuntimeException(e);
112 		}
113 	}
114 
115 }