1 package au.gov.amsa.navigation;
2
3 import java.io.File;
4 import java.io.FileInputStream;
5 import java.io.IOException;
6 import java.io.InputStream;
7 import java.io.InputStreamReader;
8 import java.io.Reader;
9 import java.util.zip.GZIPInputStream;
10
11 import com.github.davidmoten.rx.Checked;
12 import com.google.common.base.Optional;
13
14 import au.gov.amsa.risky.format.AisClass;
15 import au.gov.amsa.risky.format.FixImpl;
16 import au.gov.amsa.risky.format.NavigationalStatus;
17 import au.gov.amsa.streams.Strings;
18 import rx.Observable;
19 import rx.functions.Action1;
20 import rx.functions.Func0;
21 import rx.functions.Func1;
22
23 public final class DriftCandidates {
24
25 public static Observable<DriftCandidate> fromCsv(Reader reader) {
26 return Strings.lines(reader)
27
28 .filter(nonBlankLinesOnly())
29
30 .map(line -> toDriftCandidate(line));
31 }
32
33 private static DriftCandidate toDriftCandidate(String line) {
34 String[] items = line.split(",");
35 int i = 0;
36 int mmsi = Integer.parseInt(items[i++]);
37 float lat = Float.parseFloat(items[i++]);
38 float lon = Float.parseFloat(items[i++]);
39 long time = Long.parseLong(items[i++]);
40 String cls = items[i++];
41 float course = Float.parseFloat(items[i++]);
42 float heading = Float.parseFloat(items[i++]);
43 float speedKnots = Float.parseFloat(items[i++]);
44 String status = items[i++];
45 long driftingSince = Long.parseLong(items[i++]);
46 final Optional<NavigationalStatus> navigationalStatus;
47 if (status.trim().length() == 0)
48 navigationalStatus = Optional.absent();
49 else
50 navigationalStatus = Optional.of(NavigationalStatus.valueOf(status));
51 final AisClass aisClass;
52 if (cls.trim().length() == 0)
53 throw new RuntimeException("cls should not be empty");
54 else if (AisClass.A.name().equals(cls))
55 aisClass = AisClass.A;
56 else
57 aisClass = AisClass.B;
58 FixImpl fix = new FixImpl(mmsi, lat, lon, time, Optional.<Integer> absent(),
59 Optional.<Short> absent(), navigationalStatus, Optional.of(speedKnots),
60 Optional.of(course), Optional.of(heading), aisClass);
61 return new DriftCandidate(fix, driftingSince);
62 }
63
64 public static Observable<DriftCandidate> fromCsv(final File file, boolean zipped) {
65 Action1<Reader> disposeAction = reader -> {
66 try {
67 reader.close();
68 } catch (IOException e) {
69
70 }
71 };
72 Func0<Reader> resourceFactory = Checked.f0(() -> {
73 InputStream is;
74 if (zipped)
75 is = new GZIPInputStream(new FileInputStream(file));
76 else
77 is = new FileInputStream(file);
78 return new InputStreamReader(is);
79 });
80 Func1<Reader, Observable<DriftCandidate>> obFactory = reader -> fromCsv(reader);
81 return Observable.using(resourceFactory, obFactory, disposeAction, true);
82 }
83
84 private static Func1<String, Boolean> nonBlankLinesOnly() {
85 return line -> line.trim().length() > 0;
86 }
87
88 }