View Javadoc
1   package au.gov.amsa.navigation.ais;
2   
3   import java.util.ArrayList;
4   import java.util.Collections;
5   import java.util.Comparator;
6   
7   import rx.Observable.Operator;
8   import rx.Subscriber;
9   
10  public class SortOperator<T> implements Operator<T, T> {
11  
12  	private ArrayList<T> list;
13  	private Comparator<T> comparator;
14  
15  	public SortOperator(Comparator<T> comparator, int size) {
16  		this.comparator = comparator;
17  		this.list = new ArrayList<T>(size);
18  	}
19  
20  	@Override
21  	public Subscriber<? super T> call(final Subscriber<? super T> child) {
22  
23  		return new Subscriber<T>() {
24  
25  			long count = 0;
26  			
27  			@Override
28  			public void onCompleted() {
29  				Collections.sort(list, comparator);
30  				for (T t : list)
31  					if (child.isUnsubscribed())
32  						return;
33  					else
34  						child.onNext(t);
35  				child.onCompleted();
36  			}
37  
38  			@Override
39  			public void onError(Throwable e) {
40  				child.onError(e);
41  			}
42  
43  			@Override
44  			public void onNext(T t) {
45  				if (++count % 100000==0) {
46  					System.out.println("count="+ count);
47  				}
48  				list.add(t);
49  			}
50  		};
51  	}
52  
53  }