1 package au.gov.amsa.streams;
2
3 import java.util.concurrent.atomic.AtomicBoolean;
4
5 import rx.Observable.OnSubscribe;
6 import rx.Producer;
7 import rx.Subscriber;
8
9 public class OnSubscribeJustOneWithBackpressure<T> implements OnSubscribe<T> {
10
11 private final T value;
12
13 public OnSubscribeJustOneWithBackpressure(T t) {
14 value = t;
15 }
16
17 @Override
18 public void call(final Subscriber<? super T> child) {
19
20 child.setProducer(new Producer() {
21
22 private final AtomicBoolean emitted = new AtomicBoolean(false);
23
24 @Override
25 public void request(long n) {
26 if (n > 0) {
27 if (emitted.compareAndSet(false, true)) {
28 child.onNext(value);
29 child.onCompleted();
30 }
31 }
32 }
33 });
34 }
35
36 }