View Javadoc
1   package au.gov.amsa.navigation;
2   
3   import static com.google.common.base.Optional.absent;
4   
5   import java.util.Collection;
6   import java.util.Collections;
7   import java.util.Comparator;
8   import java.util.HashMap;
9   import java.util.Iterator;
10  import java.util.Map;
11  
12  import org.slf4j.Logger;
13  import org.slf4j.LoggerFactory;
14  
15  import com.github.davidmoten.rtree.RTree;
16  import com.github.davidmoten.rtree.geometry.Geometries;
17  import com.github.davidmoten.rtree.geometry.Point;
18  import com.google.common.base.Optional;
19  import com.google.common.collect.ArrayListMultimap;
20  import com.google.common.collect.ListMultimap;
21  
22  import au.gov.amsa.navigation.VesselPosition.NavigationalStatus;
23  import fj.Equal;
24  import fj.F;
25  import fj.Ord;
26  import fj.Ordering;
27  import fj.P3;
28  import fj.data.Option;
29  import fj.data.Set;
30  
31  class State {
32  
33      private static Logger log = LoggerFactory.getLogger(State.class);
34  
35      private static final int R_TREE_MAX_CHILDREN = 10;
36      private final Map<Identifier, Set<VesselPosition>> map;
37      private final RTree<VesselPosition, Point> tree;
38      private final Optional<VesselPosition> last;
39      private final long counter;
40      private final Set<VesselPosition> byTimeAndId;
41  
42      private State(Map<Identifier, Set<VesselPosition>> map, Set<VesselPosition> byTimeAndId,
43              RTree<VesselPosition, Point> tree, Optional<VesselPosition> last, long counter) {
44          this.map = Collections.unmodifiableMap(map);
45          this.byTimeAndId = byTimeAndId;
46          this.tree = tree;
47          this.last = last;
48          this.counter = counter;
49      }
50  
51      State() {
52          this(new HashMap<Identifier, Set<VesselPosition>>(), EMPTY,
53                  RTree.star().maxChildren(R_TREE_MAX_CHILDREN).<VesselPosition, Point> create(),
54                  Optional.<VesselPosition> absent(), 0);
55      }
56  
57      private static Ord<VesselPosition> ordering = toOrdering(Comparators.timeIdMessageIdComparator);
58      private static Set<VesselPosition> EMPTY = Set.empty(ordering);
59  
60      Optional<VesselPosition> nextPosition() {
61          if (last.isPresent())
62              return next(last.get());
63          else
64              return Optional.absent();
65      }
66  
67      private Optional<VesselPosition> next(VesselPosition p) {
68          Iterator<VesselPosition> it = map.get(p.id()).split(p)._3().iterator();
69          return it.hasNext() ? Optional.of(it.next()) : Optional.<VesselPosition> absent();
70      }
71  
72      RTree<VesselPosition, Point> tree() {
73          return tree;
74      }
75  
76      Optional<VesselPosition> last() {
77          return last;
78      }
79  
80      private static Set<VesselPosition> add(Set<VesselPosition> set, VesselPosition t) {
81          return set.insert(t);
82      }
83  
84      private static final VesselPosition.Builder BUILDER = VesselPosition.builder()
85              .positionAisNmea(absent()).cls(VesselClass.A).cogDegrees(Optional.of(0.0))
86              .headingDegrees(absent()).data(absent()).lat(0.0).lon(0.0).lengthMetres(absent())
87              .navigationalStatus(NavigationalStatus.NOT_DEFINED).shipStaticAisNmea(absent())
88              .shipType(absent()).speedMetresPerSecond(Optional.of(0.0)).widthMetres(absent())
89              .id(new Mmsi(0));
90  
91      State nextState(final long maxTimeInterval, VesselPosition p) {
92  
93          // add p to byTime, tree and map
94          Set<VesselPosition> newByTime = add(byTimeAndId, p);
95          RTree<VesselPosition, Point> newTree = tree.add(p, geometry(p));
96          Map<Identifier, Set<VesselPosition>> newMap = new HashMap<>(map);
97          addToMap(newMap, p);
98  
99          // remove expired vessel positions
100         if (counter % 10000 == 0) {
101             // remove items from the map
102             long t = System.currentTimeMillis();
103             P3<Set<VesselPosition>, Option<VesselPosition>, Set<VesselPosition>> split = newByTime
104                     .split(BUILDER.time(p.time() - maxTimeInterval).build());
105             Set<VesselPosition> removeThese = split._1();
106             // remove items from the byTime set
107             newByTime = split._3();
108 
109             ListMultimap<Identifier, VesselPosition> lists = ArrayListMultimap.create();
110             for (VesselPosition vp : removeThese) {
111                 lists.put(vp.id(), vp);
112             }
113 
114             int count = 0;
115             for (Collection<VesselPosition> list : lists.asMap().values()) {
116                 Identifier id = list.iterator().next().id();
117                 Set<VesselPosition> removals = Set.iterableSet(ordering, list);
118                 Set<VesselPosition> set = newMap.get(id);
119                 set = set.minus(removals);
120                 if (set.size() == 0)
121                     newMap.remove(id);
122                 else
123                     newMap.put(id, set);
124                 count += list.size();
125             }
126             // log.info("removed " + count + " from map");
127 
128             // remove items from the tree
129             // log.info("removing from tree");
130             for (VesselPosition vp : removeThese) {
131                 newTree = newTree.delete(vp, geometry(vp));
132             }
133             // log.info("removed from tree");
134             if (newTree.size() != newByTime.size())
135                 throw new RuntimeException("unexpected");
136             t = System.currentTimeMillis() - t;
137             log.info("removed " + count + " in " + t + "ms");
138         }
139         return new State(newMap, newByTime, newTree, Optional.of(p), counter + 1);
140     }
141 
142     private static void addToMap(Map<Identifier, Set<VesselPosition>> map, VesselPosition p) {
143         Optional<Set<VesselPosition>> existing = Optional.fromNullable(map.get(p.id()));
144         if (existing.isPresent())
145             map.put(p.id(), add(existing.get(), p));
146         else
147             map.put(p.id(), EMPTY.insert(p));
148     }
149 
150     private static Point geometry(VesselPosition p) {
151         return Geometries.point(p.lon(), p.lat());
152     }
153 
154     public int mapSize() {
155         return map.size();
156     }
157 
158     private static final Equal<VesselPosition> EQUAL_ID = Equal
159             .equal(new F<VesselPosition, F<VesselPosition, Boolean>>() {
160                 @Override
161                 public F<VesselPosition, Boolean> f(final VesselPosition a) {
162                     return new F<VesselPosition, Boolean>() {
163 
164                         @Override
165                         public Boolean f(VesselPosition b) {
166                             return a.id().equals(b.id());
167                         }
168                     };
169                 }
170             });
171 
172     private static <A> Ord<A> toOrdering(final Comparator<A> comparator) {
173         return Ord.ord(new F<A, F<A, Ordering>>() {
174             @Override
175             public F<A, Ordering> f(final A a1) {
176                 return new F<A, Ordering>() {
177                     @Override
178                     public Ordering f(final A a2) {
179                         final int x = comparator.compare(a1, a2);
180                         return x < 0 ? Ordering.LT : x == 0 ? Ordering.EQ : Ordering.GT;
181                     }
182                 };
183             }
184         });
185     }
186 
187 }