1 package au.gov.amsa.util.nmea.saver;
2
3 import java.io.BufferedWriter;
4 import java.io.File;
5 import java.io.FileOutputStream;
6 import java.io.IOException;
7 import java.io.OutputStreamWriter;
8
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11
12 import rx.Observable;
13 import rx.Scheduler;
14 import rx.Subscriber;
15 import rx.schedulers.Schedulers;
16 import au.gov.amsa.util.nmea.NmeaUtil;
17
18 import com.google.common.annotations.VisibleForTesting;
19 import com.google.common.base.Optional;
20
21 public class NmeaSaver {
22
23 private static Logger log = LoggerFactory.getLogger(NmeaSaver.class);
24
25 private volatile Subscriber<String> subscriber;
26
27 private final FileFactory factory;
28
29 private final Observable<String> source;
30
31 private final Clock clock;
32
33 @VisibleForTesting
34 NmeaSaver(Observable<String> nmea, FileFactory factory, Clock clock) {
35 this.source = nmea;
36 this.factory = factory;
37 this.clock = clock;
38 }
39
40 public NmeaSaver(Observable<String> nmea, FileFactory factory) {
41 this(nmea, factory, new SystemClock());
42 }
43
44 public void start() {
45 start(Schedulers.io());
46 }
47
48 public void start(Scheduler scheduler) {
49 subscriber = createSubscriber(factory, clock);
50 source.subscribeOn(scheduler).subscribe(subscriber);
51 }
52
53 public void stop() {
54 if (subscriber != null)
55 subscriber.unsubscribe();
56 }
57
58 private static Subscriber<String> createSubscriber(final FileFactory factory,
59 final Clock clock) {
60
61 return new Subscriber<String>() {
62
63 Optional<BufferedWriter> current = Optional.absent();
64 Optional<String> currentKey = Optional.absent();
65 boolean firstLineInFile = true;
66
67 @Override
68 public void onCompleted() {
69 log.warn("should not complete");
70 closeCurrentWriter();
71 }
72
73 @Override
74 public void onError(Throwable e) {
75 log.error(e.getMessage(), e);
76 closeCurrentWriter();
77 }
78
79 private void closeCurrentWriter() {
80 if (current.isPresent())
81 try {
82 current.get().close();
83 } catch (IOException e1) {
84 log.error(e1.getMessage(), e1);
85 }
86 }
87
88 @Override
89 public void onNext(String line) {
90 try {
91 long now = clock.getTimeMs();
92 String amendedLine = NmeaUtil.supplementWithTime(line, now);
93 String fileKey = factory.key(amendedLine, now);
94 if (!currentKey.isPresent() || !fileKey.equals(currentKey.get())) {
95 if (current.isPresent())
96 current.get().close();
97 File file = factory.file(amendedLine, now);
98 firstLineInFile = !file.exists();
99 current = Optional.of((new BufferedWriter(
100 new OutputStreamWriter(new FileOutputStream(file, true)))));
101 currentKey = Optional.of(fileKey);
102 }
103 if (!firstLineInFile)
104 current.get().write('\n');
105 firstLineInFile = false;
106 current.get().write(amendedLine);
107 } catch (IOException e) {
108 log.error(e.getMessage(), e);
109 } catch (RuntimeException e) {
110
111
112
113 }
114 }
115 };
116 }
117
118 }