View Javadoc
1   package au.gov.amsa.streams;
2   
3   import java.util.concurrent.atomic.AtomicBoolean;
4   
5   import rx.Observable.OnSubscribe;
6   import rx.Producer;
7   import rx.Subscriber;
8   
9   public class OnSubscribeJustOneWithBackpressure<T> implements OnSubscribe<T> {
10  
11  	private final T value;
12  
13  	public OnSubscribeJustOneWithBackpressure(T t) {
14  		value = t;
15  	}
16  
17  	@Override
18  	public void call(final Subscriber<? super T> child) {
19  
20  		child.setProducer(new Producer() {
21  
22  			private final AtomicBoolean emitted = new AtomicBoolean(false);
23  
24  			@Override
25  			public void request(long n) {
26  				if (n > 0) {
27  					if (emitted.compareAndSet(false, true)) {
28  						child.onNext(value);
29  						child.onCompleted();
30  					}
31  				}
32  			}
33  		});
34  	}
35  
36  }