View Javadoc
1   package au.gov.amsa.util.nmea.saver;
2   
3   import java.io.BufferedWriter;
4   import java.io.File;
5   import java.io.FileOutputStream;
6   import java.io.IOException;
7   import java.io.OutputStreamWriter;
8   
9   import org.slf4j.Logger;
10  import org.slf4j.LoggerFactory;
11  
12  import rx.Observable;
13  import rx.Scheduler;
14  import rx.Subscriber;
15  import rx.schedulers.Schedulers;
16  import au.gov.amsa.util.nmea.NmeaUtil;
17  
18  import com.google.common.annotations.VisibleForTesting;
19  import com.google.common.base.Optional;
20  
21  public class NmeaSaver {
22  
23      private static Logger log = LoggerFactory.getLogger(NmeaSaver.class);
24  
25      private volatile Subscriber<String> subscriber;
26  
27      private final FileFactory factory;
28  
29      private final Observable<String> source;
30  
31      private final Clock clock;
32  
33      @VisibleForTesting
34      NmeaSaver(Observable<String> nmea, FileFactory factory, Clock clock) {
35          this.source = nmea;
36          this.factory = factory;
37          this.clock = clock;
38      }
39  
40      public NmeaSaver(Observable<String> nmea, FileFactory factory) {
41          this(nmea, factory, new SystemClock());
42      }
43  
44      public void start() {
45          start(Schedulers.io());
46      }
47  
48      public void start(Scheduler scheduler) {
49          subscriber = createSubscriber(factory, clock);
50          source.subscribeOn(scheduler).subscribe(subscriber);
51      }
52  
53      public void stop() {
54          if (subscriber != null)
55              subscriber.unsubscribe();
56      }
57  
58      private static Subscriber<String> createSubscriber(final FileFactory factory,
59              final Clock clock) {
60  
61          return new Subscriber<String>() {
62  
63              Optional<BufferedWriter> current = Optional.absent();
64              Optional<String> currentKey = Optional.absent();
65              boolean firstLineInFile = true;
66  
67              @Override
68              public void onCompleted() {
69                  log.warn("should not complete");
70                  closeCurrentWriter();
71              }
72  
73              @Override
74              public void onError(Throwable e) {
75                  log.error(e.getMessage(), e);
76                  closeCurrentWriter();
77              }
78  
79              private void closeCurrentWriter() {
80                  if (current.isPresent())
81                      try {
82                          current.get().close();
83                      } catch (IOException e1) {
84                          log.error(e1.getMessage(), e1);
85                      }
86              }
87  
88              @Override
89              public void onNext(String line) {
90                  try {
91                      long now = clock.getTimeMs();
92                      String amendedLine = NmeaUtil.supplementWithTime(line, now);
93                      String fileKey = factory.key(amendedLine, now);
94                      if (!currentKey.isPresent() || !fileKey.equals(currentKey.get())) {
95                          if (current.isPresent())
96                              current.get().close();
97                          File file = factory.file(amendedLine, now);
98                          firstLineInFile = !file.exists();
99                          current = Optional.of((new BufferedWriter(
100                                 new OutputStreamWriter(new FileOutputStream(file, true)))));
101                         currentKey = Optional.of(fileKey);
102                     }
103                     if (!firstLineInFile)
104                         current.get().write('\n');
105                     firstLineInFile = false;
106                     current.get().write(amendedLine);
107                 } catch (IOException e) {
108                     log.error(e.getMessage(), e);
109                 } catch (RuntimeException e) {
110                     // TODO monitor the count of these JIRA ER-2878
111                     // could not parse the message, ignore
112                     // log.warn(e.getMessage() + ":" + line, e);
113                 }
114             }
115         };
116     }
117 
118 }