1 package au.gov.amsa.risky.format;
2
3 import java.io.File;
4 import java.text.DecimalFormat;
5 import java.util.List;
6 import java.util.concurrent.atomic.AtomicInteger;
7 import java.util.concurrent.atomic.AtomicLong;
8 import java.util.regex.Pattern;
9
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12
13 import com.github.davidmoten.rx.Functions;
14 import com.github.davidmoten.util.Preconditions;
15 import com.google.common.annotations.VisibleForTesting;
16
17 import au.gov.amsa.util.Files;
18 import rx.Observable;
19 import rx.Observable.Transformer;
20 import rx.functions.Action1;
21 import rx.functions.Action2;
22 import rx.functions.Func1;
23
24 public final class Formats {
25
26 private static final Logger log = LoggerFactory.getLogger(Formats.class);
27
28 public static Observable<Integer> transform(final File input, final File output,
29 Pattern pattern, final Transformer<HasFix, HasFix> transformer,
30 final Action2<List<HasFix>, File> fixesWriter, final Func1<String, String> renamer) {
31 Preconditions.checkNotNull(input);
32 Preconditions.checkNotNull(output);
33 Preconditions.checkNotNull(pattern);
34 Preconditions.checkNotNull(transformer);
35 final List<File> files = Files.find(input, pattern);
36 long n = 0;
37 for (File file : files)
38 n += file.length();
39 final long totalSizeBytes = n;
40 log.info("transforming " + new DecimalFormat("0.000").format(totalSizeBytes / 1000000.0)
41 + "MB");
42 final Action1<File> logger = new Action1<File>() {
43 final AtomicInteger count = new AtomicInteger();
44 final long startTime = System.currentTimeMillis();
45 final AtomicLong size = new AtomicLong();
46
47 @Override
48 public void call(File f) {
49 long t = System.currentTimeMillis();
50 int n = count.incrementAndGet();
51 long bytes = size.getAndAdd(f.length());
52 double timeToFinishMins;
53 if (n > 1) {
54 timeToFinishMins = (t - startTime) / (double) (bytes) * (totalSizeBytes - bytes)
55 / 1000.0 / 60.0;
56 } else
57 timeToFinishMins = -1;
58 DecimalFormat df = new DecimalFormat("0.000");
59 log.info("transforming " + n + " of " + files.size() + ":" + f + ", sizeMB="
60 + df.format(f.length() / 1000000.0) + ", finish in mins="
61 + df.format(timeToFinishMins));
62 }
63 };
64
65 log.info("converting " + files.size() + " files" + " in " + input);
66 return Observable
67
68 .from(files)
69
70 .flatMap(file -> {
71 final File outputFile = rebase(file, input, output);
72 outputFile.getParentFile().mkdirs();
73 logger.call(file);
74 return BinaryFixes.from(file, true, BinaryFixesFormat.WITHOUT_MMSI)
75
76 .toList()
77
78 .flatMapIterable(Functions.<List<Fix>> identity())
79
80 .compose(transformer)
81
82 .toList()
83
84 .doOnNext(list -> {
85 File f = new File(outputFile.getParentFile(),
86 renamer.call(outputFile.getName()));
87 fixesWriter.call(list, f);
88 })
89
90 .count();
91 });
92
93 }
94
95 @VisibleForTesting
96 static File rebase(File file, File existingParent, File newParent) {
97 if (file.getAbsolutePath().equals(existingParent.getAbsolutePath()))
98 return newParent;
99 else
100 return new File(rebase(file.getParentFile(), existingParent, newParent),
101 file.getName());
102 }
103
104 }