View Javadoc
1   package au.gov.amsa.navigation;
2   
3   import java.io.File;
4   import java.io.FileInputStream;
5   import java.io.IOException;
6   import java.io.InputStream;
7   import java.io.InputStreamReader;
8   import java.io.Reader;
9   import java.util.zip.GZIPInputStream;
10  
11  import com.github.davidmoten.rx.Checked;
12  import com.google.common.base.Optional;
13  
14  import au.gov.amsa.risky.format.AisClass;
15  import au.gov.amsa.risky.format.FixImpl;
16  import au.gov.amsa.risky.format.NavigationalStatus;
17  import au.gov.amsa.streams.Strings;
18  import rx.Observable;
19  import rx.functions.Action1;
20  import rx.functions.Func0;
21  import rx.functions.Func1;
22  
23  public final class DriftCandidates {
24  
25      public static Observable<DriftCandidate> fromCsv(Reader reader) {
26          return Strings.lines(reader)
27                  // remove blank lines
28                  .filter(nonBlankLinesOnly())
29                  // parse candidate
30                  .map(line -> toDriftCandidate(line));
31      }
32  
33      private static DriftCandidate toDriftCandidate(String line) {
34          String[] items = line.split(",");
35          int i = 0;
36          int mmsi = Integer.parseInt(items[i++]);
37          float lat = Float.parseFloat(items[i++]);
38          float lon = Float.parseFloat(items[i++]);
39          long time = Long.parseLong(items[i++]);
40          String cls = items[i++];
41          float course = Float.parseFloat(items[i++]);
42          float heading = Float.parseFloat(items[i++]);
43          float speedKnots = Float.parseFloat(items[i++]);
44          String status = items[i++];
45          long driftingSince = Long.parseLong(items[i++]);
46          final Optional<NavigationalStatus> navigationalStatus;
47          if (status.trim().length() == 0)
48              navigationalStatus = Optional.absent();
49          else
50              navigationalStatus = Optional.of(NavigationalStatus.valueOf(status));
51          final AisClass aisClass;
52          if (cls.trim().length() == 0)
53              throw new RuntimeException("cls should not be empty");
54          else if (AisClass.A.name().equals(cls))
55              aisClass = AisClass.A;
56          else
57              aisClass = AisClass.B;
58          FixImpl fix = new FixImpl(mmsi, lat, lon, time, Optional.<Integer> absent(),
59                  Optional.<Short> absent(), navigationalStatus, Optional.of(speedKnots),
60                  Optional.of(course), Optional.of(heading), aisClass);
61          return new DriftCandidate(fix, driftingSince);
62      }
63  
64      public static Observable<DriftCandidate> fromCsv(final File file, boolean zipped) {
65          Action1<Reader> disposeAction = reader -> {
66              try {
67                  reader.close();
68              } catch (IOException e) {
69                  // ignore
70              }
71          };
72          Func0<Reader> resourceFactory = Checked.f0(() -> {
73              InputStream is;
74              if (zipped)
75                  is = new GZIPInputStream(new FileInputStream(file));
76              else
77                  is = new FileInputStream(file);
78              return new InputStreamReader(is);
79          });
80          Func1<Reader, Observable<DriftCandidate>> obFactory = reader -> fromCsv(reader);
81          return Observable.using(resourceFactory, obFactory, disposeAction, true);
82      }
83  
84      private static Func1<String, Boolean> nonBlankLinesOnly() {
85          return line -> line.trim().length() > 0;
86      }
87  
88  }