1 package au.gov.amsa.streams;
2
3 import java.util.concurrent.TimeUnit;
4 import java.util.concurrent.atomic.AtomicBoolean;
5
6 import rx.Observable.Operator;
7 import rx.Scheduler;
8 import rx.Scheduler.Worker;
9 import rx.Subscriber;
10 import rx.observers.Subscribers;
11 import rx.subjects.PublishSubject;
12
13 public class OperatorSubscriptionInterval<T> implements Operator<T, T> {
14
15 private final long duration;
16 private final TimeUnit units;
17 private final Scheduler scheduler;
18
19 public OperatorSubscriptionInterval(long duration, TimeUnit units, Scheduler scheduler) {
20 this.duration = duration;
21 this.units = units;
22 this.scheduler = scheduler;
23 }
24
25 private final AtomicBoolean firstTime = new AtomicBoolean(true);
26
27 @Override
28 public Subscriber<? super T> call(Subscriber<? super T> child) {
29 Subscriber<T> parent;
30 if (firstTime.compareAndSet(true, false)) {
31
32 parent = Subscribers.from(child);
33 } else {
34 final PublishSubject<T> subject = PublishSubject.create();
35 Worker worker = scheduler.createWorker();
36 worker.schedule(() -> {
37 subject.unsafeSubscribe(child);
38 }, duration, units);
39 child.add(worker);
40 parent = Subscribers.from(subject);
41 }
42 child.add(parent);
43 return parent;
44 }
45 }