View Javadoc
1   package au.gov.amsa.ais.router.model;
2   
3   import java.nio.charset.StandardCharsets;
4   import java.util.Optional;
5   
6   import com.github.davidmoten.guavamini.Preconditions;
7   import com.github.davidmoten.rx.Transformers;
8   
9   import au.gov.amsa.streams.StringSockets;
10  import rx.Observable;
11  
12  public final class Connection implements GroupMember {
13  
14      private final String id;
15      private final String host;
16      private final int port;
17      private final boolean ssl;
18      private final Optional<Authentication> authentication;
19      private final int readTimeoutMs;
20      private final long retryIntervalMs;
21      private final boolean enabled;
22      private final Optional<Proxy> proxy;
23      private final Observable<String> lines;
24  
25      private Connection(String id, String host, int port, boolean ssl,
26              Optional<Authentication> authentication, int readTimeoutMs, long retryIntervalMs,
27              boolean enabled, Optional<Proxy> proxy) {
28          Util.verifyId(id);
29          Preconditions.checkNotNull(host);
30          Preconditions.checkArgument(port > 0 && port <= 65535, "port must be between 0 and 65535");
31          Preconditions.checkNotNull(authentication);
32          Preconditions.checkArgument(readTimeoutMs >= 0, "readTimeMs must be >=0");
33          Preconditions.checkArgument(retryIntervalMs >= 0, "retryIntervalMs must be >=0");
34          Preconditions.checkNotNull(proxy);
35          this.id = id;
36          this.host = host;
37          this.port = port;
38          this.ssl = ssl;
39          this.authentication = authentication;
40          this.readTimeoutMs = readTimeoutMs;
41          this.retryIntervalMs = retryIntervalMs;
42          this.enabled = enabled;
43          this.proxy = proxy;
44          // multiple parent groups share the same stream
45          this.lines = StringSockets.from(host).charset(StandardCharsets.UTF_8).port(port)
46                  .quietTimeoutMs(readTimeoutMs).reconnectDelayMs(retryIntervalMs).create()
47                  .compose(Transformers.split("\n")).map(s -> s.trim()).share();
48      }
49  
50      public String id() {
51          return id;
52      }
53  
54      public String host() {
55          return host;
56      }
57  
58      public int port() {
59          return port;
60      }
61  
62      public boolean ssl() {
63          return ssl;
64      }
65  
66      public Optional<Authentication> authentication() {
67          return authentication;
68      }
69  
70      public int readTimeoutMs() {
71          return readTimeoutMs;
72      }
73  
74      public long retryIntervalMs() {
75          return retryIntervalMs;
76      }
77  
78      public boolean enabled() {
79          return enabled;
80      }
81  
82      public static Builder builder() {
83          return new Builder();
84      }
85  
86      public static class Builder {
87  
88          private String id;
89          private String host;
90          private int port;
91          private boolean ssl;
92          private Optional<Authentication> authentication = Optional.empty();
93          private int readTimeoutMs;
94          private long retryIntervalMs;
95          private boolean enabled = true;
96          private Optional<Proxy> proxy = Optional.empty();
97  
98          private Builder() {
99          }
100 
101         public Builder id(String id) {
102             this.id = id;
103             return this;
104         }
105 
106         public Builder host(String host) {
107             this.host = host;
108             return this;
109         }
110 
111         public Builder port(int port) {
112             this.port = port;
113             return this;
114         }
115 
116         public Builder ssl(boolean ssl) {
117             this.ssl = ssl;
118             return this;
119         }
120 
121         public Builder proxy(Proxy proxy) {
122             this.proxy = Optional.of(proxy);
123             return this;
124         }
125 
126         public Builder authentication(Optional<Authentication> authentication) {
127             this.authentication = authentication;
128             return this;
129         }
130 
131         public Builder authentication(Authentication authentication) {
132             return authentication(Optional.of(authentication));
133         }
134 
135         public Builder readTimeoutMs(int readTimeoutMs) {
136             this.readTimeoutMs = readTimeoutMs;
137             return this;
138         }
139 
140         public Builder readTimeoutSeconds(double val) {
141             return readTimeoutMs((int) Math.round(val * 1000));
142         }
143 
144         public Builder retryIntervalMs(long retryIntervalMs) {
145             this.retryIntervalMs = retryIntervalMs;
146             return this;
147         }
148 
149         public Builder retryIntervalSeconds(double val) {
150             return retryIntervalMs(Math.round(val * 1000));
151         }
152 
153         public Builder enabled(boolean enabled) {
154             this.enabled = enabled;
155             return this;
156         }
157 
158         public Connection build() {
159             return new Connection(id, host, port, ssl, authentication, readTimeoutMs,
160                     retryIntervalMs, enabled, proxy);
161         }
162     }
163 
164     @Override
165     public Observable<String> lines() {
166         return lines;
167     }
168 
169     @Override
170     public String toString() {
171         StringBuilder b = new StringBuilder();
172         b.append("Connection [id=");
173         b.append(id);
174         b.append(", host=");
175         b.append(host);
176         b.append(", port=");
177         b.append(port);
178         b.append(", ssl=");
179         b.append(ssl);
180         b.append(", authentication=");
181         b.append(authentication);
182         b.append(", readTimeoutMs=");
183         b.append(readTimeoutMs);
184         b.append(", retryIntervalMs=");
185         b.append(retryIntervalMs);
186         b.append(", enabled=");
187         b.append(enabled);
188         b.append(", proxy=");
189         b.append(proxy);
190         b.append("]");
191         return b.toString();
192     }
193 
194 }