View Javadoc
1   package au.gov.amsa.util.rx;
2   
3   import java.util.ArrayList;
4   import java.util.List;
5   
6   import rx.Observable.Operator;
7   import rx.Subscriber;
8   import rx.functions.Func1;
9   
10  /**
11   * Buffers items into lists that can overlap but not more than two at a time.
12   * The commencement of the next buffer is signalled by a change in the value of
13   * {@code startFunction} and the finish of a buffer is signalled by a change in
14   * the value of the {@code whileFunction}.
15   *
16   * @param <T>
17   *            generic type
18   */
19  public class OperatorDynamicBuffer<T> implements Operator<List<T>, T> {
20  
21      private static final Object NOT_SET = new Object();
22      private static final Object NULL_SENTINEL = new Object();
23      private final Func1<T, ?> startFunction;
24      private final Func1<T, ?> whileFunction;
25  
26      /**
27       * Constructor.
28       * 
29       * @param startFunction
30       *            a change in the value of this function signals the start of a
31       *            new buffer
32       * @param whileFunction
33       *            a change in the value of this function signals the finish of a
34       *            buffer
35       */
36      public OperatorDynamicBuffer(Func1<T, ?> startFunction, Func1<T, ?> whileFunction) {
37          this.startFunction = startFunction;
38          this.whileFunction = whileFunction;
39      }
40  
41      @Override
42      public Subscriber<? super T> call(Subscriber<? super List<T>> child) {
43          return new Subscriber<T>(child) {
44  
45              private List<T> list;
46              private List<T> list2;
47              private Object lastStartValue = NOT_SET;
48              private Object lastWhileValue = NOT_SET;
49  
50              @Override
51              public void onCompleted() {
52                  if (list != null && !isUnsubscribed())
53                      child.onNext(list);
54                  if (list2 != null && !isUnsubscribed())
55                      child.onNext(list2);
56                  if (!isUnsubscribed())
57                      child.onCompleted();
58              }
59  
60              @Override
61              public void onError(Throwable e) {
62                  child.onError(e);
63              }
64  
65              @Override
66              public void onNext(T t) {
67                  if (list == null)
68                      list = new ArrayList<T>();
69                  Object startValue = replaceNull(startFunction.call(t));
70                  Object whileValue = replaceNull(whileFunction.call(t));
71                  if (lastWhileValue == NOT_SET) {
72                      list.add(t);
73                      request(1);
74                  } else {
75                      if (lastWhileValue.equals(whileValue)) {
76                          list.add(t);
77                          request(1);
78                      } else {
79                          // emit list 1
80                          child.onNext(list);
81                          list = null;
82                      }
83                      if (lastStartValue != NOT_SET && !lastStartValue.equals(startValue)) {
84                          // startFunction value has changed so we are ready to
85                          // start another buffer
86                          if (list2 == null) {
87                              list2 = new ArrayList<T>();
88                          } else {
89                              child.onError(new RuntimeException("unexpected"));
90                              return;
91                          }
92                          list2.add(t);
93                      }
94                      if (list == null && list2 != null) {
95                          // move second list into first position
96                          list = list2;
97                          list2 = null;
98                      }
99                  }
100                 lastStartValue = startValue;
101                 lastWhileValue = whileValue;
102             }
103         };
104     }
105 
106     private static Object replaceNull(Object o) {
107         if (o == null)
108             return NULL_SENTINEL;
109         else
110             return o;
111     }
112 }