1 package au.gov.amsa.streams; 2 3 import java.util.concurrent.atomic.AtomicBoolean; 4 5 import rx.Observable.OnSubscribe; 6 import rx.Producer; 7 import rx.Subscriber; 8 9 public class OnSubscribeJustOneWithBackpressure<T> implements OnSubscribe<T> { 10 11 private final T value; 12 13 public OnSubscribeJustOneWithBackpressure(T t) { 14 value = t; 15 } 16 17 @Override 18 public void call(final Subscriber<? super T> child) { 19 20 child.setProducer(new Producer() { 21 22 private final AtomicBoolean emitted = new AtomicBoolean(false); 23 24 @Override 25 public void request(long n) { 26 if (n > 0) { 27 if (emitted.compareAndSet(false, true)) { 28 child.onNext(value); 29 child.onCompleted(); 30 } 31 } 32 } 33 }); 34 } 35 36 }