1 package au.gov.amsa.ais.router.model;
2
3 import java.nio.charset.StandardCharsets;
4 import java.util.Optional;
5
6 import com.github.davidmoten.guavamini.Preconditions;
7 import com.github.davidmoten.rx.Transformers;
8
9 import au.gov.amsa.streams.StringSockets;
10 import rx.Observable;
11
12 public final class Connection implements GroupMember {
13
14 private final String id;
15 private final String host;
16 private final int port;
17 private final boolean ssl;
18 private final Optional<Authentication> authentication;
19 private final int readTimeoutMs;
20 private final long retryIntervalMs;
21 private final boolean enabled;
22 private final Optional<Proxy> proxy;
23 private final Observable<String> lines;
24
25 private Connection(String id, String host, int port, boolean ssl,
26 Optional<Authentication> authentication, int readTimeoutMs, long retryIntervalMs,
27 boolean enabled, Optional<Proxy> proxy) {
28 Util.verifyId(id);
29 Preconditions.checkNotNull(host);
30 Preconditions.checkArgument(port > 0 && port <= 65535, "port must be between 0 and 65535");
31 Preconditions.checkNotNull(authentication);
32 Preconditions.checkArgument(readTimeoutMs >= 0, "readTimeMs must be >=0");
33 Preconditions.checkArgument(retryIntervalMs >= 0, "retryIntervalMs must be >=0");
34 Preconditions.checkNotNull(proxy);
35 this.id = id;
36 this.host = host;
37 this.port = port;
38 this.ssl = ssl;
39 this.authentication = authentication;
40 this.readTimeoutMs = readTimeoutMs;
41 this.retryIntervalMs = retryIntervalMs;
42 this.enabled = enabled;
43 this.proxy = proxy;
44
45 this.lines = StringSockets.from(host).charset(StandardCharsets.UTF_8).port(port)
46 .quietTimeoutMs(readTimeoutMs).reconnectDelayMs(retryIntervalMs).create()
47 .compose(Transformers.split("\n")).map(s -> s.trim()).share();
48 }
49
50 public String id() {
51 return id;
52 }
53
54 public String host() {
55 return host;
56 }
57
58 public int port() {
59 return port;
60 }
61
62 public boolean ssl() {
63 return ssl;
64 }
65
66 public Optional<Authentication> authentication() {
67 return authentication;
68 }
69
70 public int readTimeoutMs() {
71 return readTimeoutMs;
72 }
73
74 public long retryIntervalMs() {
75 return retryIntervalMs;
76 }
77
78 public boolean enabled() {
79 return enabled;
80 }
81
82 public static Builder builder() {
83 return new Builder();
84 }
85
86 public static class Builder {
87
88 private String id;
89 private String host;
90 private int port;
91 private boolean ssl;
92 private Optional<Authentication> authentication = Optional.empty();
93 private int readTimeoutMs;
94 private long retryIntervalMs;
95 private boolean enabled = true;
96 private Optional<Proxy> proxy = Optional.empty();
97
98 private Builder() {
99 }
100
101 public Builder id(String id) {
102 this.id = id;
103 return this;
104 }
105
106 public Builder host(String host) {
107 this.host = host;
108 return this;
109 }
110
111 public Builder port(int port) {
112 this.port = port;
113 return this;
114 }
115
116 public Builder ssl(boolean ssl) {
117 this.ssl = ssl;
118 return this;
119 }
120
121 public Builder proxy(Proxy proxy) {
122 this.proxy = Optional.of(proxy);
123 return this;
124 }
125
126 public Builder authentication(Optional<Authentication> authentication) {
127 this.authentication = authentication;
128 return this;
129 }
130
131 public Builder authentication(Authentication authentication) {
132 return authentication(Optional.of(authentication));
133 }
134
135 public Builder readTimeoutMs(int readTimeoutMs) {
136 this.readTimeoutMs = readTimeoutMs;
137 return this;
138 }
139
140 public Builder readTimeoutSeconds(double val) {
141 return readTimeoutMs((int) Math.round(val * 1000));
142 }
143
144 public Builder retryIntervalMs(long retryIntervalMs) {
145 this.retryIntervalMs = retryIntervalMs;
146 return this;
147 }
148
149 public Builder retryIntervalSeconds(double val) {
150 return retryIntervalMs(Math.round(val * 1000));
151 }
152
153 public Builder enabled(boolean enabled) {
154 this.enabled = enabled;
155 return this;
156 }
157
158 public Connection build() {
159 return new Connection(id, host, port, ssl, authentication, readTimeoutMs,
160 retryIntervalMs, enabled, proxy);
161 }
162 }
163
164 @Override
165 public Observable<String> lines() {
166 return lines;
167 }
168
169 @Override
170 public String toString() {
171 StringBuilder b = new StringBuilder();
172 b.append("Connection [id=");
173 b.append(id);
174 b.append(", host=");
175 b.append(host);
176 b.append(", port=");
177 b.append(port);
178 b.append(", ssl=");
179 b.append(ssl);
180 b.append(", authentication=");
181 b.append(authentication);
182 b.append(", readTimeoutMs=");
183 b.append(readTimeoutMs);
184 b.append(", retryIntervalMs=");
185 b.append(retryIntervalMs);
186 b.append(", enabled=");
187 b.append(enabled);
188 b.append(", proxy=");
189 b.append(proxy);
190 b.append("]");
191 return b.toString();
192 }
193
194 }