View Javadoc
1   package au.gov.amsa.ihs.reader;
2   
3   import java.io.IOException;
4   import java.io.InputStream;
5   import java.util.HashMap;
6   import java.util.Map;
7   
8   import javax.xml.parsers.SAXParser;
9   import javax.xml.parsers.SAXParserFactory;
10  
11  import org.apache.log4j.Logger;
12  import org.xml.sax.Attributes;
13  import org.xml.sax.SAXException;
14  import org.xml.sax.helpers.DefaultHandler;
15  
16  import com.google.common.base.Optional;
17  
18  import rx.Observable.Operator;
19  import rx.Subscriber;
20  
21  public class OperatorIhsReader implements Operator<Map<String, String>, InputStream> {
22  
23      private static final Logger log = Logger.getLogger(OperatorIhsReader.class);
24      private final String parentElementName;
25  
26      public OperatorIhsReader(String parentElementName) {
27          this.parentElementName = parentElementName;
28      }
29  
30      @Override
31      public Subscriber<? super InputStream> call(
32              final Subscriber<? super Map<String, String>> child) {
33          return new Subscriber<InputStream>() {
34  
35              @Override
36              public void onCompleted() {
37                  child.onCompleted();
38              }
39  
40              @Override
41              public void onError(Throwable e) {
42                  child.onError(e);
43              }
44  
45              @Override
46              public void onNext(InputStream is) {
47                  try {
48                      SAXParserFactory factory = SAXParserFactory.newInstance();
49                      SAXParser parser = factory.newSAXParser();
50                      DefaultHandler handler = createHandler(child, parentElementName);
51                      parser.parse(is, handler);
52                  } catch (UnsubscribedSAXException e) {
53                      child.onCompleted();
54                  } catch (Exception e) {
55                      onError(e);
56                  } finally {
57                      try {
58                          is.close();
59                      } catch (IOException e) {
60                          // ignore
61                      }
62                  }
63              }
64          };
65      }
66  
67      public static class UnsubscribedSAXException extends SAXException {
68  
69          private static final long serialVersionUID = 1L;
70  
71      }
72  
73      public static class MyDefaultHandler extends DefaultHandler {
74  
75          private int count = 0;
76          private final Map<String, String> values = new HashMap<String, String>();
77          private Optional<String> currentElement = Optional.absent();
78          private final Subscriber<? super Map<String, String>> subscriber;
79          private final String parentElementName;
80  
81          public MyDefaultHandler(Subscriber<? super Map<String, String>> subscriber,
82                  String parentElementName) {
83              this.subscriber = subscriber;
84              this.parentElementName = parentElementName;
85          }
86  
87          @Override
88          public void startElement(String uri, String localName, String qName, Attributes attributes)
89                  throws SAXException {
90              checkSubscription(subscriber);
91              if (parentElementName.equals(qName)) {
92                  values.clear();
93                  count++;
94                  if (count % 1000 == 0)
95                      log.info(count + " records read");
96              }
97              currentElement = Optional.of(qName);
98          }
99  
100         private void checkSubscription(final Subscriber<? super Map<String, String>> subscriber)
101                 throws UnsubscribedSAXException {
102             if (subscriber.isUnsubscribed())
103                 throw new UnsubscribedSAXException();
104         }
105 
106         @Override
107         public void characters(char[] ch, int start, int length) throws SAXException {
108             checkSubscription(subscriber);
109             String val = new String(ch, start, length).trim();
110             if (val.length() > 0) {
111                 String currentValue = values.get(currentElement.get());
112                 if (currentValue == null)
113                     values.put(currentElement.get(), val);
114                 else
115                     values.put(currentElement.get(), currentValue + val);
116             }
117         }
118 
119         @Override
120         public void endElement(String uri, String localName, String qName) throws SAXException {
121             checkSubscription(subscriber);
122             if (parentElementName.equals(qName)) {
123                 try {
124                     subscriber.onNext(new HashMap<>(values));
125                 } catch (RuntimeException e) {
126                     throw new RuntimeException("error building Ship from " + values, e);
127                 }
128             }
129         }
130 
131     }
132 
133     private static DefaultHandler createHandler(
134             final Subscriber<? super Map<String, String>> subscriber, String parentElementName) {
135         return new MyDefaultHandler(subscriber, parentElementName);
136     }
137 
138 }