1 package au.gov.amsa.util.rx;
2
3 import java.util.ArrayList;
4 import java.util.List;
5
6 import rx.Observable.Operator;
7 import rx.Subscriber;
8 import rx.functions.Func1;
9
10
11
12
13
14
15
16
17
18
19 public class OperatorDynamicBuffer<T> implements Operator<List<T>, T> {
20
21 private static final Object NOT_SET = new Object();
22 private static final Object NULL_SENTINEL = new Object();
23 private final Func1<T, ?> startFunction;
24 private final Func1<T, ?> whileFunction;
25
26
27
28
29
30
31
32
33
34
35
36 public OperatorDynamicBuffer(Func1<T, ?> startFunction, Func1<T, ?> whileFunction) {
37 this.startFunction = startFunction;
38 this.whileFunction = whileFunction;
39 }
40
41 @Override
42 public Subscriber<? super T> call(Subscriber<? super List<T>> child) {
43 return new Subscriber<T>(child) {
44
45 private List<T> list;
46 private List<T> list2;
47 private Object lastStartValue = NOT_SET;
48 private Object lastWhileValue = NOT_SET;
49
50 @Override
51 public void onCompleted() {
52 if (list != null && !isUnsubscribed())
53 child.onNext(list);
54 if (list2 != null && !isUnsubscribed())
55 child.onNext(list2);
56 if (!isUnsubscribed())
57 child.onCompleted();
58 }
59
60 @Override
61 public void onError(Throwable e) {
62 child.onError(e);
63 }
64
65 @Override
66 public void onNext(T t) {
67 if (list == null)
68 list = new ArrayList<T>();
69 Object startValue = replaceNull(startFunction.call(t));
70 Object whileValue = replaceNull(whileFunction.call(t));
71 if (lastWhileValue == NOT_SET) {
72 list.add(t);
73 request(1);
74 } else {
75 if (lastWhileValue.equals(whileValue)) {
76 list.add(t);
77 request(1);
78 } else {
79
80 child.onNext(list);
81 list = null;
82 }
83 if (lastStartValue != NOT_SET && !lastStartValue.equals(startValue)) {
84
85
86 if (list2 == null) {
87 list2 = new ArrayList<T>();
88 } else {
89 child.onError(new RuntimeException("unexpected"));
90 return;
91 }
92 list2.add(t);
93 }
94 if (list == null && list2 != null) {
95
96 list = list2;
97 list2 = null;
98 }
99 }
100 lastStartValue = startValue;
101 lastWhileValue = whileValue;
102 }
103 };
104 }
105
106 private static Object replaceNull(Object o) {
107 if (o == null)
108 return NULL_SENTINEL;
109 else
110 return o;
111 }
112 }