View Javadoc
1   package au.gov.amsa.risky.format;
2   
3   import java.io.File;
4   import java.text.DecimalFormat;
5   import java.util.List;
6   import java.util.concurrent.atomic.AtomicInteger;
7   import java.util.concurrent.atomic.AtomicLong;
8   import java.util.regex.Pattern;
9   
10  import org.slf4j.Logger;
11  import org.slf4j.LoggerFactory;
12  
13  import com.github.davidmoten.rx.Functions;
14  import com.github.davidmoten.util.Preconditions;
15  import com.google.common.annotations.VisibleForTesting;
16  
17  import au.gov.amsa.util.Files;
18  import rx.Observable;
19  import rx.Observable.Transformer;
20  import rx.functions.Action1;
21  import rx.functions.Action2;
22  import rx.functions.Func1;
23  
24  public final class Formats {
25  
26      private static final Logger log = LoggerFactory.getLogger(Formats.class);
27  
28      public static Observable<Integer> transform(final File input, final File output,
29              Pattern pattern, final Transformer<HasFix, HasFix> transformer,
30              final Action2<List<HasFix>, File> fixesWriter, final Func1<String, String> renamer) {
31          Preconditions.checkNotNull(input);
32          Preconditions.checkNotNull(output);
33          Preconditions.checkNotNull(pattern);
34          Preconditions.checkNotNull(transformer);
35          final List<File> files = Files.find(input, pattern);
36          long n = 0;
37          for (File file : files)
38              n += file.length();
39          final long totalSizeBytes = n;
40          log.info("transforming " + new DecimalFormat("0.000").format(totalSizeBytes / 1000000.0)
41                  + "MB");
42          final Action1<File> logger = new Action1<File>() {
43              final AtomicInteger count = new AtomicInteger();
44              final long startTime = System.currentTimeMillis();
45              final AtomicLong size = new AtomicLong();
46  
47              @Override
48              public void call(File f) {
49                  long t = System.currentTimeMillis();
50                  int n = count.incrementAndGet();
51                  long bytes = size.getAndAdd(f.length());
52                  double timeToFinishMins;
53                  if (n > 1) {
54                      timeToFinishMins = (t - startTime) / (double) (bytes) * (totalSizeBytes - bytes)
55                              / 1000.0 / 60.0;
56                  } else
57                      timeToFinishMins = -1;
58                  DecimalFormat df = new DecimalFormat("0.000");
59                  log.info("transforming " + n + " of " + files.size() + ":" + f + ", sizeMB="
60                          + df.format(f.length() / 1000000.0) + ", finish in mins="
61                          + df.format(timeToFinishMins));
62              }
63          };
64  
65          log.info("converting " + files.size() + " files" + " in " + input);
66          return Observable
67                  // get the files matching the pattern from the directory
68                  .from(files)
69                  // replace the file with a transformed version
70                  .flatMap(file -> {
71                      final File outputFile = rebase(file, input, output);
72                      outputFile.getParentFile().mkdirs();
73                      logger.call(file);
74                      return BinaryFixes.from(file, true, BinaryFixesFormat.WITHOUT_MMSI)
75                              // to list
76                              .toList()
77                              // flatten
78                              .flatMapIterable(Functions.<List<Fix>> identity())
79                              // transform the fixes
80                              .compose(transformer)
81                              // make into a list again
82                              .toList()
83                              // replace the file with sorted fixes
84                              .doOnNext(list -> {
85                          File f = new File(outputFile.getParentFile(),
86                                  renamer.call(outputFile.getName()));
87                          fixesWriter.call(list, f);
88                      })
89                              // count the fixes
90                              .count();
91                  });
92  
93      }
94  
95      @VisibleForTesting
96      static File rebase(File file, File existingParent, File newParent) {
97          if (file.getAbsolutePath().equals(existingParent.getAbsolutePath()))
98              return newParent;
99          else
100             return new File(rebase(file.getParentFile(), existingParent, newParent),
101                     file.getName());
102     }
103 
104 }