1 package au.gov.amsa.ais.router;
2
3 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory;
5
6 import au.gov.amsa.ais.router.model.Port;
7 import rx.Observable;
8 import rx.Subscriber;
9 import rx.schedulers.Schedulers;
10
11 public class Router {
12
13 private static final Logger log = LoggerFactory.getLogger(Router.class);
14
15 public static Subscriber<Port> start(Port... ports) {
16 Subscriber<Port> subscriber = createSubscriber();
17 Observable
18 .from(ports)
19 .flatMap(port -> Observable
20 .just(port)
21 .doOnNext(p -> p.start())
22 .subscribeOn(Schedulers.io()))
23 .subscribe(subscriber);
24 return subscriber;
25 }
26
27 private static Subscriber<Port> createSubscriber() {
28 return new Subscriber<Port>() {
29
30 @Override
31 public void onCompleted() {
32 log.info("all ports stopped");
33 }
34
35 @Override
36 public void onError(Throwable e) {
37 log.error(e.getMessage(), e);
38 }
39
40 @Override
41 public void onNext(Port t) {
42
43 }
44 };
45 }
46 }