View Javadoc
1   package au.gov.amsa.streams;
2   
3   import java.util.concurrent.TimeUnit;
4   import java.util.concurrent.atomic.AtomicBoolean;
5   
6   import rx.Observable.Operator;
7   import rx.Scheduler;
8   import rx.Scheduler.Worker;
9   import rx.Subscriber;
10  import rx.observers.Subscribers;
11  import rx.subjects.PublishSubject;
12  
13  public class OperatorSubscriptionInterval<T> implements Operator<T, T> {
14  
15      private final long duration;
16      private final TimeUnit units;
17      private final Scheduler scheduler;
18  
19      public OperatorSubscriptionInterval(long duration, TimeUnit units, Scheduler scheduler) {
20          this.duration = duration;
21          this.units = units;
22          this.scheduler = scheduler;
23      }
24  
25      private final AtomicBoolean firstTime = new AtomicBoolean(true);
26  
27      @Override
28      public Subscriber<? super T> call(Subscriber<? super T> child) {
29          Subscriber<T> parent;
30          if (firstTime.compareAndSet(true, false)) {
31              // don't delay subscription for the first time
32              parent = Subscribers.from(child);
33          } else {
34              final PublishSubject<T> subject = PublishSubject.create();
35              Worker worker = scheduler.createWorker();
36              worker.schedule(() -> {
37                  subject.unsafeSubscribe(child);
38              }, duration, units);
39              child.add(worker);
40              parent = Subscribers.from(subject);
41          }
42          child.add(parent);
43          return parent;
44      }
45  }