View Javadoc
1   package au.gov.amsa.ais.router;
2   
3   import org.slf4j.Logger;
4   import org.slf4j.LoggerFactory;
5   
6   import au.gov.amsa.ais.router.model.Port;
7   import rx.Observable;
8   import rx.Subscriber;
9   import rx.schedulers.Schedulers;
10  
11  public class Router {
12  
13      private static final Logger log = LoggerFactory.getLogger(Router.class);
14  
15      public static Subscriber<Port> start(Port... ports) {
16          Subscriber<Port> subscriber = createSubscriber();
17          Observable //
18                  .from(ports) //
19                  .flatMap(port -> Observable //
20                          .just(port) //
21                          .doOnNext(p -> p.start()) //
22                          .subscribeOn(Schedulers.io()))
23                  .subscribe(subscriber);
24          return subscriber;
25      }
26  
27      private static Subscriber<Port> createSubscriber() {
28          return new Subscriber<Port>() {
29  
30              @Override
31              public void onCompleted() {
32                  log.info("all ports stopped");
33              }
34  
35              @Override
36              public void onError(Throwable e) {
37                  log.error(e.getMessage(), e);
38              }
39  
40              @Override
41              public void onNext(Port t) {
42  
43              }
44          };
45      }
46  }