1 package au.gov.amsa.ais.rx;
2
3 import java.io.IOException;
4 import java.io.OutputStream;
5
6 import rx.Observable;
7 import rx.functions.Func1;
8
9 public class RxUtil {
10
11 public static <T> Func1<T, T> println(final OutputStream out) {
12 return t -> {
13 try {
14 out.write(t.toString().getBytes());
15 out.write('\n');
16 } catch (IOException e) {
17 throw new RuntimeException(e);
18 }
19 return t;
20 };
21 }
22
23 public static <T> Func1<T, T> println() {
24 return println(System.out);
25 }
26
27 public static <T> void print(Observable<T> o) {
28 o.materialize().toBlocking().forEach(System.out::println);
29 }
30
31 @SuppressWarnings("unchecked")
32 public static <T> Observable<T> concatButIgnoreFirstSequence(Observable<?> o1, Observable<T> o2) {
33 return Observable.concat((Observable<T>) o1.filter(com.github.davidmoten.rx.Functions
34 .<Object> alwaysFalse()), o2);
35 }
36
37 }