View Javadoc
1   package au.gov.amsa.craft.analyzer.wms;
2   
3   import java.io.File;
4   import java.util.List;
5   import java.util.concurrent.atomic.AtomicLong;
6   import java.util.regex.Pattern;
7   
8   import org.slf4j.Logger;
9   import org.slf4j.LoggerFactory;
10  
11  import rx.Observable;
12  import rx.Scheduler;
13  import rx.functions.Func1;
14  import rx.schedulers.Schedulers;
15  import au.gov.amsa.navigation.DriftCandidate;
16  import au.gov.amsa.navigation.DriftCandidates;
17  import au.gov.amsa.navigation.DriftDetector;
18  import au.gov.amsa.navigation.VesselPosition;
19  import au.gov.amsa.navigation.VesselPositions;
20  import au.gov.amsa.risky.format.BinaryFixes;
21  import au.gov.amsa.risky.format.Fix;
22  import au.gov.amsa.risky.format.NavigationalStatus;
23  import au.gov.amsa.util.Files;
24  
25  import com.google.common.base.Optional;
26  
27  public class Sources {
28      private static final Logger log = LoggerFactory.getLogger(Sources.class);
29  
30      public static Observable<VesselPosition> fixes() {
31          List<File> files = Files.find(new File("/home/dave/Downloads/binary-fixes-2014-5-minutes"),
32                  Pattern.compile(".*\\.track"));
33          log.info("files=" + files.size());
34          final AtomicLong num = new AtomicLong();
35          return Observable
36                  // list files
37                  .from(files)
38                  // share the load between processors
39                  .buffer(Math.max(1, files.size() / Runtime.getRuntime().availableProcessors() - 1))
40                  // search each list of files for drift detections
41                  .flatMap(detectDrift(num, Schedulers.computation()))
42                  .map(VesselPositions.TO_VESSEL_POSITION);
43      }
44  
45      public static Observable<VesselPosition> fixes2(File file) {
46          return DriftCandidates.fromCsv(file, false).filter(new Func1<DriftCandidate, Boolean>() {
47  
48              @Override
49              public Boolean call(DriftCandidate c) {
50                  return !c.fix().navigationalStatus().isPresent()
51                          || c.fix().navigationalStatus().get() != NavigationalStatus.ENGAGED_IN_FISHING;
52              }
53          }).map(VesselPositions.toVesselPosition(new Func1<DriftCandidate, Optional<?>>() {
54              @Override
55              public Optional<Long> call(DriftCandidate c) {
56                  return Optional.of(c.driftingSince());
57              }
58          }));
59      }
60  
61      private static Func1<List<File>, Observable<Fix>> detectDrift(AtomicLong num,
62              final Scheduler scheduler) {
63          return new Func1<List<File>, Observable<Fix>>() {
64  
65              @Override
66              public Observable<Fix> call(List<File> files) {
67                  return BinaryFixes.from(files).compose(DriftDetector.detectDrift())
68                          .map(new Func1<DriftCandidate, Fix>() {
69  
70                              @Override
71                              public Fix call(DriftCandidate c) {
72                                  return c.fix();
73                              }
74                          }).subscribeOn(scheduler);
75              }
76          };
77      }
78  }