1 package au.gov.amsa.ihs.reader;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.HashMap;
6 import java.util.Map;
7
8 import javax.xml.parsers.SAXParser;
9 import javax.xml.parsers.SAXParserFactory;
10
11 import org.apache.log4j.Logger;
12 import org.xml.sax.Attributes;
13 import org.xml.sax.SAXException;
14 import org.xml.sax.helpers.DefaultHandler;
15
16 import com.google.common.base.Optional;
17
18 import rx.Observable.Operator;
19 import rx.Subscriber;
20
21 public class OperatorIhsReader implements Operator<Map<String, String>, InputStream> {
22
23 private static final Logger log = Logger.getLogger(OperatorIhsReader.class);
24 private final String parentElementName;
25
26 public OperatorIhsReader(String parentElementName) {
27 this.parentElementName = parentElementName;
28 }
29
30 @Override
31 public Subscriber<? super InputStream> call(
32 final Subscriber<? super Map<String, String>> child) {
33 return new Subscriber<InputStream>() {
34
35 @Override
36 public void onCompleted() {
37 child.onCompleted();
38 }
39
40 @Override
41 public void onError(Throwable e) {
42 child.onError(e);
43 }
44
45 @Override
46 public void onNext(InputStream is) {
47 try {
48 SAXParserFactory factory = SAXParserFactory.newInstance();
49 SAXParser parser = factory.newSAXParser();
50 DefaultHandler handler = createHandler(child, parentElementName);
51 parser.parse(is, handler);
52 } catch (UnsubscribedSAXException e) {
53 child.onCompleted();
54 } catch (Exception e) {
55 onError(e);
56 } finally {
57 try {
58 is.close();
59 } catch (IOException e) {
60
61 }
62 }
63 }
64 };
65 }
66
67 public static class UnsubscribedSAXException extends SAXException {
68
69 private static final long serialVersionUID = 1L;
70
71 }
72
73 public static class MyDefaultHandler extends DefaultHandler {
74
75 private int count = 0;
76 private final Map<String, String> values = new HashMap<String, String>();
77 private Optional<String> currentElement = Optional.absent();
78 private final Subscriber<? super Map<String, String>> subscriber;
79 private final String parentElementName;
80
81 public MyDefaultHandler(Subscriber<? super Map<String, String>> subscriber,
82 String parentElementName) {
83 this.subscriber = subscriber;
84 this.parentElementName = parentElementName;
85 }
86
87 @Override
88 public void startElement(String uri, String localName, String qName, Attributes attributes)
89 throws SAXException {
90 checkSubscription(subscriber);
91 if (parentElementName.equals(qName)) {
92 values.clear();
93 count++;
94 if (count % 1000 == 0)
95 log.info(count + " records read");
96 }
97 currentElement = Optional.of(qName);
98 }
99
100 private void checkSubscription(final Subscriber<? super Map<String, String>> subscriber)
101 throws UnsubscribedSAXException {
102 if (subscriber.isUnsubscribed())
103 throw new UnsubscribedSAXException();
104 }
105
106 @Override
107 public void characters(char[] ch, int start, int length) throws SAXException {
108 checkSubscription(subscriber);
109 String val = new String(ch, start, length).trim();
110 if (val.length() > 0) {
111 String currentValue = values.get(currentElement.get());
112 if (currentValue == null)
113 values.put(currentElement.get(), val);
114 else
115 values.put(currentElement.get(), currentValue + val);
116 }
117 }
118
119 @Override
120 public void endElement(String uri, String localName, String qName) throws SAXException {
121 checkSubscription(subscriber);
122 if (parentElementName.equals(qName)) {
123 try {
124 subscriber.onNext(new HashMap<>(values));
125 } catch (RuntimeException e) {
126 throw new RuntimeException("error building Ship from " + values, e);
127 }
128 }
129 }
130
131 }
132
133 private static DefaultHandler createHandler(
134 final Subscriber<? super Map<String, String>> subscriber, String parentElementName) {
135 return new MyDefaultHandler(subscriber, parentElementName);
136 }
137
138 }