Loading src/main/java/es/redmic/vesselrestrictionchecker/VesselRestrictionCheckerApplication.java +53 −5 Original line number Diff line number Diff line Loading @@ -45,7 +45,13 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase private static final String GEO_HASH_KEY = "geohash", RESULT_GEOMETRY_PROPERTY = "geometry", RESULT_VESSEL_MMSI_PROPERTY = "vesselMmsi"; RESULT_VESSEL_MMSI_PROPERTY = "vesselMmsi", VESSEL_TYPE_PROPERTY = "vesselType", MAX_SPEED_PROPERTY = "maxSpeed", VESSEL_TYPES_RESTRICTED_PROPERTY = "vesselTypesRestricted", VESSEL_SPEED_PROPERTY = "sog"; private static final Double SPEED_TOLERANCE = 2.0; // @formatter:on Loading Loading @@ -97,6 +103,8 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase Joined.valueSerde(getGenericAvroSerde())) .flatMapValues(value -> value).selectKey((k, v) -> v.get(RESULT_VESSEL_MMSI_PROPERTY)) .to(resultTopic, Produced.with(null, getSpecificAvroSerde())); // TODO: Agregar alertas en los últimos x minutos. De esta forma se evitarán // falsas alarmas. return builder.build(); } Loading Loading @@ -128,8 +136,9 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase // Se crea un nuevo registro con el geohash code y solo con la info necesaria avroRecord.put("mmsi", AvroUtils.getSpecificRecordProperty(value, "mmsi").toString()); avroRecord.put("name", AvroUtils.getSpecificRecordProperty(value, "name").toString()); avroRecord.put("dateTime", Long.parseLong(AvroUtils.getSpecificRecordProperty(value, "tstamp").toString())); avroRecord.put("vesselType", Integer.parseInt(AvroUtils.getSpecificRecordProperty(value, "type").toString())); avroRecord.put("dateTime", AvroUtils.getSpecificRecordProperty(value, "tstamp")); avroRecord.put("vesselType", AvroUtils.getSpecificRecordProperty(value, "type")); avroRecord.put("sog", AvroUtils.getSpecificRecordProperty(value, "sog")); return avroRecord; } Loading @@ -156,6 +165,9 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase avroRecord.put(RESULT_GEOMETRY_PROPERTY, geometry.toString()); avroRecord.put("id", AvroUtils.getSpecificRecordProperty(value, "id").toString()); avroRecord.put("name", AvroUtils.getSpecificRecordProperty(value, "name").toString()); avroRecord.put(VESSEL_TYPES_RESTRICTED_PROPERTY, AvroUtils.getSpecificRecordProperty(value, VESSEL_TYPES_RESTRICTED_PROPERTY)); avroRecord.put(MAX_SPEED_PROPERTY, AvroUtils.getSpecificRecordProperty(value, MAX_SPEED_PROPERTY)); avroRecord.put(GEO_HASH_KEY, geoHash); values.add(avroRecord); } Loading Loading @@ -201,7 +213,7 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase // TODO: analizar si es necesario seguir procesando elementos una vez encontrada // un área. // Al menos no seguir procesando elementos de la misma área if (GeoUtils.shapeContainsGeometry(area, point)) { if (GeoUtils.shapeContainsGeometry(area, point) && !fulfillNavigationConstraints(pointRecord, areaRecord)) { // Se crea una alerta con la info básica del punto y del área donde se encuentra PointInAreaAlert pointInAreaAlert = new PointInAreaAlert(); Loading @@ -211,16 +223,52 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase pointInAreaAlert.setDateTime( new DateTime(Long.parseLong(pointRecord.get("dateTime").toString()), DateTimeZone.UTC)); pointInAreaAlert.setVesselType(Integer.parseInt(pointRecord.get("vesselType").toString())); pointInAreaAlert.setSog((Double) pointRecord.get("sog")); pointInAreaAlert.setAreaId(areaRecord.get("id").toString()); pointInAreaAlert.setAreaName(areaRecord.get("name").toString()); result.add(pointInAreaAlert); } } return result; } @SuppressWarnings({ "rawtypes" }) private boolean fulfillNavigationConstraints(GenericRecord pointRecord, GenericRecord areaRecord) { Integer vesselType = (Integer) pointRecord.get(VESSEL_TYPE_PROPERTY); Double vesselSpeed = pointRecord.get(VESSEL_SPEED_PROPERTY) != null ? ((Double) pointRecord.get(VESSEL_SPEED_PROPERTY)) : null; List<?> vesselTypesRestricted = areaRecord.get(VESSEL_TYPES_RESTRICTED_PROPERTY) != null ? ((List) areaRecord.get(VESSEL_TYPES_RESTRICTED_PROPERTY)) : null; Double maxSpeed = areaRecord.get(MAX_SPEED_PROPERTY) != null ? ((Double) areaRecord.get(MAX_SPEED_PROPERTY)) : null; boolean fulfillVesselTypeConstraintResult = fulfillVesselTypeConstraint(vesselType, vesselTypesRestricted), fulfillSpeedConstraintResult = fulfillSpeedConstraint(vesselSpeed, maxSpeed); return (fulfillVesselTypeConstraintResult && fulfillSpeedConstraintResult) || (vesselTypesRestricted == null && maxSpeed != null && fulfillSpeedConstraintResult); } private boolean fulfillVesselTypeConstraint(Integer vesselType, List<?> vesselTypesRestricted) { if (vesselTypesRestricted == null || vesselTypesRestricted.size() == 0) return false; return (vesselTypesRestricted.stream().filter(item -> item.toString().equals(vesselType.toString())) .count() == 0); } private boolean fulfillSpeedConstraint(Double vesselSpeed, Double maxSpeed) { return maxSpeed == null || vesselSpeed != null && (vesselSpeed + SPEED_TOLERANCE <= maxSpeed); } public static void main(String[] args) { Map<String, Object> env = getEnvVariables(requiredVariables); Loading src/main/java/es/redmic/vesselrestrictionchecker/dto/PointInAreaAlert.java +17 −1 Original line number Diff line number Diff line Loading @@ -18,7 +18,8 @@ public class PointInAreaAlert extends org.apache.avro.specific.SpecificRecordBas + "{\"name\":\"vesselName\",\"type\":\"string\"}," + "{\"name\":\"geometry\",\"type\":\"string\"}," + "{\"name\":\"dateTime\",\"type\":{ \"type\":\"long\",\"logicalType\":\"timestamp-millis\"}}," + "{\"name\":\"vesselType\",\"type\":\"int\"}]}"); + "{\"name\":\"vesselType\",\"type\":\"int\"}," + "{\"name\":\"sog\",\"type\":[\"double\", \"null\"]}]}"); //@formatter:on private String areaId; Loading @@ -35,6 +36,8 @@ public class PointInAreaAlert extends org.apache.avro.specific.SpecificRecordBas private Integer vesselType; private Double sog; public String getAreaId() { return areaId; } Loading Loading @@ -91,6 +94,14 @@ public class PointInAreaAlert extends org.apache.avro.specific.SpecificRecordBas this.vesselType = vesselType; } public Double getSog() { return sog; } public void setSog(Double sog) { this.sog = sog; } @Override public Schema getSchema() { return SCHEMA$; Loading @@ -113,6 +124,8 @@ public class PointInAreaAlert extends org.apache.avro.specific.SpecificRecordBas return dateTime.getMillis(); case 6: return vesselType; case 7: return sog; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } Loading Loading @@ -142,6 +155,9 @@ public class PointInAreaAlert extends org.apache.avro.specific.SpecificRecordBas case 6: vesselType = (java.lang.Integer) value; break; case 7: sog = value != null ? (Double) value : null; break; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } Loading src/main/java/es/redmic/vesselrestrictionchecker/dto/SimpleArea.java +26 −0 Original line number Diff line number Diff line package es.redmic.vesselrestrictionchecker.dto; import java.util.List; import org.apache.avro.reflect.Nullable; public class SimpleArea { private String id; Loading @@ -10,6 +14,12 @@ public class SimpleArea { private String name; @Nullable private List<String> vesselTypesRestricted; @Nullable private Double maxSpeed; public String getId() { return id; } Loading Loading @@ -41,4 +51,20 @@ public class SimpleArea { public void setName(String name) { this.name = name; } public List<String> getVesselTypesRestricted() { return vesselTypesRestricted; } public void setVesselTypesRestricted(List<String> vesselTypesRestricted) { this.vesselTypesRestricted = vesselTypesRestricted; } public Double getMaxSpeed() { return maxSpeed; } public void setMaxSpeed(Double maxSpeed) { this.maxSpeed = maxSpeed; } } src/main/java/es/redmic/vesselrestrictionchecker/dto/SimplePoint.java +10 −0 Original line number Diff line number Diff line Loading @@ -14,6 +14,8 @@ public class SimplePoint { private Integer vesselType; private Double sog; public String getMmsi() { return mmsi; } Loading Loading @@ -61,4 +63,12 @@ public class SimplePoint { public void setVesselType(Integer vesselType) { this.vesselType = vesselType; } public Double getSog() { return sog; } public void setSog(Double sog) { this.sog = sog; } } src/test/java/es/redmic/vesselrestrictionchecker/streams/AISTrackingDTO.java +17 −1 Original line number Diff line number Diff line Loading @@ -16,7 +16,8 @@ public class AISTrackingDTO extends org.apache.avro.specific.SpecificRecordBase + "{\"name\":\"latitude\",\"type\":\"double\"}," + "{\"name\":\"longitude\",\"type\":\"double\"}," + "{\"name\":\"type\",\"type\":\"int\"}," + "{\"name\":\"name\",\"type\":\"string\"}]}"); + "{\"name\":\"name\",\"type\":\"string\"}," + "{\"name\":\"sog\",\"type\":[\"double\", \"null\"]}]}"); //@formatter:on private Integer mmsi; Loading @@ -31,6 +32,8 @@ public class AISTrackingDTO extends org.apache.avro.specific.SpecificRecordBase private String name; private Double sog; public AISTrackingDTO() { } Loading Loading @@ -86,6 +89,14 @@ public class AISTrackingDTO extends org.apache.avro.specific.SpecificRecordBase this.name = name; } public Double getSog() { return sog; } public void setSog(Double sog) { this.sog = sog; } @Override public org.apache.avro.Schema getSchema() { return SCHEMA$; Loading @@ -107,6 +118,8 @@ public class AISTrackingDTO extends org.apache.avro.specific.SpecificRecordBase return type; case 5: return name; case 6: return sog; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } Loading Loading @@ -134,6 +147,9 @@ public class AISTrackingDTO extends org.apache.avro.specific.SpecificRecordBase case 5: name = value$ != null ? value$.toString() : null; break; case 6: sog = value$ != null ? (Double) value$ : null; break; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } Loading Loading
src/main/java/es/redmic/vesselrestrictionchecker/VesselRestrictionCheckerApplication.java +53 −5 Original line number Diff line number Diff line Loading @@ -45,7 +45,13 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase private static final String GEO_HASH_KEY = "geohash", RESULT_GEOMETRY_PROPERTY = "geometry", RESULT_VESSEL_MMSI_PROPERTY = "vesselMmsi"; RESULT_VESSEL_MMSI_PROPERTY = "vesselMmsi", VESSEL_TYPE_PROPERTY = "vesselType", MAX_SPEED_PROPERTY = "maxSpeed", VESSEL_TYPES_RESTRICTED_PROPERTY = "vesselTypesRestricted", VESSEL_SPEED_PROPERTY = "sog"; private static final Double SPEED_TOLERANCE = 2.0; // @formatter:on Loading Loading @@ -97,6 +103,8 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase Joined.valueSerde(getGenericAvroSerde())) .flatMapValues(value -> value).selectKey((k, v) -> v.get(RESULT_VESSEL_MMSI_PROPERTY)) .to(resultTopic, Produced.with(null, getSpecificAvroSerde())); // TODO: Agregar alertas en los últimos x minutos. De esta forma se evitarán // falsas alarmas. return builder.build(); } Loading Loading @@ -128,8 +136,9 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase // Se crea un nuevo registro con el geohash code y solo con la info necesaria avroRecord.put("mmsi", AvroUtils.getSpecificRecordProperty(value, "mmsi").toString()); avroRecord.put("name", AvroUtils.getSpecificRecordProperty(value, "name").toString()); avroRecord.put("dateTime", Long.parseLong(AvroUtils.getSpecificRecordProperty(value, "tstamp").toString())); avroRecord.put("vesselType", Integer.parseInt(AvroUtils.getSpecificRecordProperty(value, "type").toString())); avroRecord.put("dateTime", AvroUtils.getSpecificRecordProperty(value, "tstamp")); avroRecord.put("vesselType", AvroUtils.getSpecificRecordProperty(value, "type")); avroRecord.put("sog", AvroUtils.getSpecificRecordProperty(value, "sog")); return avroRecord; } Loading @@ -156,6 +165,9 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase avroRecord.put(RESULT_GEOMETRY_PROPERTY, geometry.toString()); avroRecord.put("id", AvroUtils.getSpecificRecordProperty(value, "id").toString()); avroRecord.put("name", AvroUtils.getSpecificRecordProperty(value, "name").toString()); avroRecord.put(VESSEL_TYPES_RESTRICTED_PROPERTY, AvroUtils.getSpecificRecordProperty(value, VESSEL_TYPES_RESTRICTED_PROPERTY)); avroRecord.put(MAX_SPEED_PROPERTY, AvroUtils.getSpecificRecordProperty(value, MAX_SPEED_PROPERTY)); avroRecord.put(GEO_HASH_KEY, geoHash); values.add(avroRecord); } Loading Loading @@ -201,7 +213,7 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase // TODO: analizar si es necesario seguir procesando elementos una vez encontrada // un área. // Al menos no seguir procesando elementos de la misma área if (GeoUtils.shapeContainsGeometry(area, point)) { if (GeoUtils.shapeContainsGeometry(area, point) && !fulfillNavigationConstraints(pointRecord, areaRecord)) { // Se crea una alerta con la info básica del punto y del área donde se encuentra PointInAreaAlert pointInAreaAlert = new PointInAreaAlert(); Loading @@ -211,16 +223,52 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase pointInAreaAlert.setDateTime( new DateTime(Long.parseLong(pointRecord.get("dateTime").toString()), DateTimeZone.UTC)); pointInAreaAlert.setVesselType(Integer.parseInt(pointRecord.get("vesselType").toString())); pointInAreaAlert.setSog((Double) pointRecord.get("sog")); pointInAreaAlert.setAreaId(areaRecord.get("id").toString()); pointInAreaAlert.setAreaName(areaRecord.get("name").toString()); result.add(pointInAreaAlert); } } return result; } @SuppressWarnings({ "rawtypes" }) private boolean fulfillNavigationConstraints(GenericRecord pointRecord, GenericRecord areaRecord) { Integer vesselType = (Integer) pointRecord.get(VESSEL_TYPE_PROPERTY); Double vesselSpeed = pointRecord.get(VESSEL_SPEED_PROPERTY) != null ? ((Double) pointRecord.get(VESSEL_SPEED_PROPERTY)) : null; List<?> vesselTypesRestricted = areaRecord.get(VESSEL_TYPES_RESTRICTED_PROPERTY) != null ? ((List) areaRecord.get(VESSEL_TYPES_RESTRICTED_PROPERTY)) : null; Double maxSpeed = areaRecord.get(MAX_SPEED_PROPERTY) != null ? ((Double) areaRecord.get(MAX_SPEED_PROPERTY)) : null; boolean fulfillVesselTypeConstraintResult = fulfillVesselTypeConstraint(vesselType, vesselTypesRestricted), fulfillSpeedConstraintResult = fulfillSpeedConstraint(vesselSpeed, maxSpeed); return (fulfillVesselTypeConstraintResult && fulfillSpeedConstraintResult) || (vesselTypesRestricted == null && maxSpeed != null && fulfillSpeedConstraintResult); } private boolean fulfillVesselTypeConstraint(Integer vesselType, List<?> vesselTypesRestricted) { if (vesselTypesRestricted == null || vesselTypesRestricted.size() == 0) return false; return (vesselTypesRestricted.stream().filter(item -> item.toString().equals(vesselType.toString())) .count() == 0); } private boolean fulfillSpeedConstraint(Double vesselSpeed, Double maxSpeed) { return maxSpeed == null || vesselSpeed != null && (vesselSpeed + SPEED_TOLERANCE <= maxSpeed); } public static void main(String[] args) { Map<String, Object> env = getEnvVariables(requiredVariables); Loading
src/main/java/es/redmic/vesselrestrictionchecker/dto/PointInAreaAlert.java +17 −1 Original line number Diff line number Diff line Loading @@ -18,7 +18,8 @@ public class PointInAreaAlert extends org.apache.avro.specific.SpecificRecordBas + "{\"name\":\"vesselName\",\"type\":\"string\"}," + "{\"name\":\"geometry\",\"type\":\"string\"}," + "{\"name\":\"dateTime\",\"type\":{ \"type\":\"long\",\"logicalType\":\"timestamp-millis\"}}," + "{\"name\":\"vesselType\",\"type\":\"int\"}]}"); + "{\"name\":\"vesselType\",\"type\":\"int\"}," + "{\"name\":\"sog\",\"type\":[\"double\", \"null\"]}]}"); //@formatter:on private String areaId; Loading @@ -35,6 +36,8 @@ public class PointInAreaAlert extends org.apache.avro.specific.SpecificRecordBas private Integer vesselType; private Double sog; public String getAreaId() { return areaId; } Loading Loading @@ -91,6 +94,14 @@ public class PointInAreaAlert extends org.apache.avro.specific.SpecificRecordBas this.vesselType = vesselType; } public Double getSog() { return sog; } public void setSog(Double sog) { this.sog = sog; } @Override public Schema getSchema() { return SCHEMA$; Loading @@ -113,6 +124,8 @@ public class PointInAreaAlert extends org.apache.avro.specific.SpecificRecordBas return dateTime.getMillis(); case 6: return vesselType; case 7: return sog; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } Loading Loading @@ -142,6 +155,9 @@ public class PointInAreaAlert extends org.apache.avro.specific.SpecificRecordBas case 6: vesselType = (java.lang.Integer) value; break; case 7: sog = value != null ? (Double) value : null; break; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } Loading
src/main/java/es/redmic/vesselrestrictionchecker/dto/SimpleArea.java +26 −0 Original line number Diff line number Diff line package es.redmic.vesselrestrictionchecker.dto; import java.util.List; import org.apache.avro.reflect.Nullable; public class SimpleArea { private String id; Loading @@ -10,6 +14,12 @@ public class SimpleArea { private String name; @Nullable private List<String> vesselTypesRestricted; @Nullable private Double maxSpeed; public String getId() { return id; } Loading Loading @@ -41,4 +51,20 @@ public class SimpleArea { public void setName(String name) { this.name = name; } public List<String> getVesselTypesRestricted() { return vesselTypesRestricted; } public void setVesselTypesRestricted(List<String> vesselTypesRestricted) { this.vesselTypesRestricted = vesselTypesRestricted; } public Double getMaxSpeed() { return maxSpeed; } public void setMaxSpeed(Double maxSpeed) { this.maxSpeed = maxSpeed; } }
src/main/java/es/redmic/vesselrestrictionchecker/dto/SimplePoint.java +10 −0 Original line number Diff line number Diff line Loading @@ -14,6 +14,8 @@ public class SimplePoint { private Integer vesselType; private Double sog; public String getMmsi() { return mmsi; } Loading Loading @@ -61,4 +63,12 @@ public class SimplePoint { public void setVesselType(Integer vesselType) { this.vesselType = vesselType; } public Double getSog() { return sog; } public void setSog(Double sog) { this.sog = sog; } }
src/test/java/es/redmic/vesselrestrictionchecker/streams/AISTrackingDTO.java +17 −1 Original line number Diff line number Diff line Loading @@ -16,7 +16,8 @@ public class AISTrackingDTO extends org.apache.avro.specific.SpecificRecordBase + "{\"name\":\"latitude\",\"type\":\"double\"}," + "{\"name\":\"longitude\",\"type\":\"double\"}," + "{\"name\":\"type\",\"type\":\"int\"}," + "{\"name\":\"name\",\"type\":\"string\"}]}"); + "{\"name\":\"name\",\"type\":\"string\"}," + "{\"name\":\"sog\",\"type\":[\"double\", \"null\"]}]}"); //@formatter:on private Integer mmsi; Loading @@ -31,6 +32,8 @@ public class AISTrackingDTO extends org.apache.avro.specific.SpecificRecordBase private String name; private Double sog; public AISTrackingDTO() { } Loading Loading @@ -86,6 +89,14 @@ public class AISTrackingDTO extends org.apache.avro.specific.SpecificRecordBase this.name = name; } public Double getSog() { return sog; } public void setSog(Double sog) { this.sog = sog; } @Override public org.apache.avro.Schema getSchema() { return SCHEMA$; Loading @@ -107,6 +118,8 @@ public class AISTrackingDTO extends org.apache.avro.specific.SpecificRecordBase return type; case 5: return name; case 6: return sog; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } Loading Loading @@ -134,6 +147,9 @@ public class AISTrackingDTO extends org.apache.avro.specific.SpecificRecordBase case 5: name = value$ != null ? value$.toString() : null; break; case 6: sog = value$ != null ? (Double) value$ : null; break; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } Loading