1 package au.gov.amsa.craft.analyzer.wms;
2
3 import java.io.File;
4 import java.util.List;
5 import java.util.concurrent.atomic.AtomicLong;
6 import java.util.regex.Pattern;
7
8 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory;
10
11 import rx.Observable;
12 import rx.Scheduler;
13 import rx.functions.Func1;
14 import rx.schedulers.Schedulers;
15 import au.gov.amsa.navigation.DriftCandidate;
16 import au.gov.amsa.navigation.DriftCandidates;
17 import au.gov.amsa.navigation.DriftDetector;
18 import au.gov.amsa.navigation.VesselPosition;
19 import au.gov.amsa.navigation.VesselPositions;
20 import au.gov.amsa.risky.format.BinaryFixes;
21 import au.gov.amsa.risky.format.Fix;
22 import au.gov.amsa.risky.format.NavigationalStatus;
23 import au.gov.amsa.util.Files;
24
25 import com.google.common.base.Optional;
26
27 public class Sources {
28 private static final Logger log = LoggerFactory.getLogger(Sources.class);
29
30 public static Observable<VesselPosition> fixes() {
31 List<File> files = Files.find(new File("/home/dave/Downloads/binary-fixes-2014-5-minutes"),
32 Pattern.compile(".*\\.track"));
33 log.info("files=" + files.size());
34 final AtomicLong num = new AtomicLong();
35 return Observable
36
37 .from(files)
38
39 .buffer(Math.max(1, files.size() / Runtime.getRuntime().availableProcessors() - 1))
40
41 .flatMap(detectDrift(num, Schedulers.computation()))
42 .map(VesselPositions.TO_VESSEL_POSITION);
43 }
44
45 public static Observable<VesselPosition> fixes2(File file) {
46 return DriftCandidates.fromCsv(file, false).filter(new Func1<DriftCandidate, Boolean>() {
47
48 @Override
49 public Boolean call(DriftCandidate c) {
50 return !c.fix().navigationalStatus().isPresent()
51 || c.fix().navigationalStatus().get() != NavigationalStatus.ENGAGED_IN_FISHING;
52 }
53 }).map(VesselPositions.toVesselPosition(new Func1<DriftCandidate, Optional<?>>() {
54 @Override
55 public Optional<Long> call(DriftCandidate c) {
56 return Optional.of(c.driftingSince());
57 }
58 }));
59 }
60
61 private static Func1<List<File>, Observable<Fix>> detectDrift(AtomicLong num,
62 final Scheduler scheduler) {
63 return new Func1<List<File>, Observable<Fix>>() {
64
65 @Override
66 public Observable<Fix> call(List<File> files) {
67 return BinaryFixes.from(files).compose(DriftDetector.detectDrift())
68 .map(new Func1<DriftCandidate, Fix>() {
69
70 @Override
71 public Fix call(DriftCandidate c) {
72 return c.fix();
73 }
74 }).subscribeOn(scheduler);
75 }
76 };
77 }
78 }