1 package au.gov.amsa.navigation.ais;
2
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.Comparator;
6
7 import rx.Observable.Operator;
8 import rx.Subscriber;
9
10 public class SortOperator<T> implements Operator<T, T> {
11
12 private ArrayList<T> list;
13 private Comparator<T> comparator;
14
15 public SortOperator(Comparator<T> comparator, int size) {
16 this.comparator = comparator;
17 this.list = new ArrayList<T>(size);
18 }
19
20 @Override
21 public Subscriber<? super T> call(final Subscriber<? super T> child) {
22
23 return new Subscriber<T>() {
24
25 long count = 0;
26
27 @Override
28 public void onCompleted() {
29 Collections.sort(list, comparator);
30 for (T t : list)
31 if (child.isUnsubscribed())
32 return;
33 else
34 child.onNext(t);
35 child.onCompleted();
36 }
37
38 @Override
39 public void onError(Throwable e) {
40 child.onError(e);
41 }
42
43 @Override
44 public void onNext(T t) {
45 if (++count % 100000==0) {
46 System.out.println("count="+ count);
47 }
48 list.add(t);
49 }
50 };
51 }
52
53 }