Commit ee9ecb93 authored by Noel Alonso's avatar Noel Alonso
Browse files

Cambia estructura de streams

Debido a algunos fallos derivados de la topología definida, ha sido
necesario redefinir algunos detalles de los streams
parent 9ad00484
Loading
Loading
Loading
Loading
+51 −37
Original line number Diff line number Diff line
@@ -9,11 +9,10 @@ import java.util.Properties;

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
@@ -32,6 +31,8 @@ import es.redmic.vesselrestrictionchecker.dto.SimpleArea;
import es.redmic.vesselrestrictionchecker.dto.SimplePoint;
import es.redmic.vesselrestrictionchecker.utils.AvroUtils;
import es.redmic.vesselrestrictionchecker.utils.GeoUtils;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;

public class VesselRestrictionCheckerApplication extends StreamsApplicationBase {

@@ -59,43 +60,52 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase
		}
	};

	public VesselRestrictionCheckerApplication(String schemaRegistryUrl) {
		super(schemaRegistryUrl);
	public VesselRestrictionCheckerApplication(SchemaRegistryClient schemaRegistryClient, String schemaRegistryUrl) {
		super(schemaRegistryClient, schemaRegistryUrl);
	}

	public Topology getTopology(String pointsTopic, String areasTopic, String resultTopic) {

		StreamsBuilder builder = new StreamsBuilder();

		HashMapSerde<String, GenericRecord> hashMapSerde = new HashMapSerde<String, GenericRecord>(schemaRegistryUrl);
		HashMapSerde<String, GenericRecord> hashMapSerde = new HashMapSerde<String, GenericRecord>(schemaRegistryClient,
				schemaRegistryUrl);

		KStream<Object, SpecificRecord> areasStream = builder.stream(areasTopic, Consumed.with(null, getValueSerde()));
		KStream<String, SpecificRecord> areasStream = builder.stream(areasTopic,
				Consumed.with(null, getSpecificAvroSerde()));

		KStream<Object, SpecificRecord> pointsStream = builder.stream(pointsTopic,
				Consumed.with(null, getValueSerde()));
		KStream<String, SpecificRecord> pointsStream = builder.stream(pointsTopic,
				Consumed.with(null, getSpecificAvroSerde()));

		KTable<Object, SpecificRecord> areasKTable = areasStream.groupByKey(Serialized.with(null, getValueSerde()))
				.reduce((aggValue, newValue) -> newValue);
		KStream<String, SpecificRecord> lastAreasKStream = areasStream
				.groupByKey(Serialized.with(null, getSpecificAvroSerde()))
				.reduce((aggValue, newValue) -> newValue, Materialized.with(null, getSpecificAvroSerde())).toStream();

		KStream<String, GenericRecord> areasKStreamEnriched = areasKTable.toStream()
				.flatMapValues((value) -> enrichAreaWithGeoHash(value))
				.selectKey((key, value) -> value.get(GEO_HASH_KEY).toString());
		KStream<String, GenericRecord> areasKStreamEnriched = lastAreasKStream
				.flatMapValues((value) -> enrichAreaWithGeoHash(value)).selectKey((key, value) -> getGeoHashKey(value));

		KTable<String, HashMap<String, GenericRecord>> areasKTableAgg = areasKStreamEnriched.groupByKey().aggregate(
				HashMap<String, GenericRecord>::new, (k, v, map) -> AvroUtils.aggregateGenericRecordInMap(k, v, map),
				Materialized.with(Serdes.String(), hashMapSerde));
		KTable<String, HashMap<String, GenericRecord>> areasKTableAgg = areasKStreamEnriched
				.groupByKey(Serialized.with(null, getGenericAvroSerde())).aggregate(HashMap<String, GenericRecord>::new,
						// Agrega las diferentes áreas con el mismo geoHashCode
						(k, v, map) -> AvroUtils.aggregateGenericRecordInMap(k, v, map, "id"),
						Materialized.with(null, hashMapSerde));

		KStream<String, GenericRecord> pointsStreamEnriched = pointsStream
				.map((key, value) -> KeyValue.pair(AvroUtils.getSpecificRecordProperty(value, GEO_HASH_KEY).toString(),
						enrichPointWithGeoHash(value)));
				.mapValues(value -> enrichPointWithGeoHash(value)).selectKey((k, v) -> getGeoHashKey(v));

		pointsStreamEnriched.join(areasKTableAgg, (point, areas) -> getPointInAreaAlert(point, areas))
		pointsStreamEnriched
				.join(areasKTableAgg, (point, areas) -> getPointInAreaAlert(point, areas),
						Joined.valueSerde(getGenericAvroSerde()))
				.flatMapValues(value -> value).selectKey((k, v) -> v.get(RESULT_VESSEL_MMSI_PROPERTY))
				.to(resultTopic, Produced.with(null, getValueSerde()));
				.to(resultTopic, Produced.with(null, getSpecificAvroSerde()));

		return builder.build();
	}

	private String getGeoHashKey(GenericRecord v) {
		return v.get(GEO_HASH_KEY).toString();
	}

	private GenericRecord enrichPointWithGeoHash(SpecificRecord value) {

		GenericRecord avroRecord = AvroUtils.getGenericRecordFromClass(SimplePoint.class);
@@ -116,11 +126,11 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase
			e.printStackTrace();
			return null;
		}

		avroRecord.put("mmsi", AvroUtils.getSpecificRecordProperty(value, "mmsi"));
		avroRecord.put("name", AvroUtils.getSpecificRecordProperty(value, "name"));
		avroRecord.put("dateTime", AvroUtils.getSpecificRecordProperty(value, "tstamp"));
		avroRecord.put("vesselType", AvroUtils.getSpecificRecordProperty(value, "type"));
		// Se crea un nuevo registro con el geohash code y solo con la info necesaria
		avroRecord.put("mmsi", AvroUtils.getSpecificRecordProperty(value, "mmsi").toString());
		avroRecord.put("name", AvroUtils.getSpecificRecordProperty(value, "name").toString());
		avroRecord.put("dateTime", Long.parseLong(AvroUtils.getSpecificRecordProperty(value, "tstamp").toString()));
		avroRecord.put("vesselType", Integer.parseInt(AvroUtils.getSpecificRecordProperty(value, "type").toString()));

		return avroRecord;
	}
@@ -141,16 +151,14 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase
			try {
				List<String> geoHashList = GeoUtils.getGeoHash(geometry, GEO_HASH_PRECISION);

				for (String geoHash : geoHashList) {
					// Se crean nuevos registros con el geohash code y solo con la info necesaria
					GenericRecord avroRecord = AvroUtils.getGenericRecordFromClass(SimpleArea.class);
					avroRecord.put(RESULT_GEOMETRY_PROPERTY, geometry.toString());
				avroRecord.put("id", AvroUtils.getSpecificRecordProperty(value, "id"));
				avroRecord.put("name", AvroUtils.getSpecificRecordProperty(value, "name"));

				for (String geoHash : geoHashList) {

					GenericRecord avroRecordAux = avroRecord;
					avroRecordAux.put(GEO_HASH_KEY, geoHash);
					values.add(avroRecordAux);
					avroRecord.put("id", AvroUtils.getSpecificRecordProperty(value, "id").toString());
					avroRecord.put("name", AvroUtils.getSpecificRecordProperty(value, "name").toString());
					avroRecord.put(GEO_HASH_KEY, geoHash);
					values.add(avroRecord);
				}
			} catch (InvalidShapeException | ParseException e) {
				e.printStackTrace();
@@ -191,14 +199,19 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase
				break;
			}

			// TODO: analizar si es necesario seguir procesando elementos una vez encontrada
			// un área.
			// Al menos no seguir procesando elementos de la misma área
			if (area.relate(point) == SpatialRelation.CONTAINS) {

				// Se crea una alerta con la info básica del punto y del área donde se encuentra
				PointInAreaAlert pointInAreaAlert = new PointInAreaAlert();
				pointInAreaAlert.setVesselMmsi(pointRecord.get("mmsi").toString());
				pointInAreaAlert.setVesselName(pointRecord.get("name").toString());
				pointInAreaAlert.setGeometry(pointRecord.get("geometry").toString());
				pointInAreaAlert.setDateTime(new DateTime(pointRecord.get("dateTime"), DateTimeZone.UTC));
				pointInAreaAlert.setVesselType((Integer) pointRecord.get("vesselType"));
				pointInAreaAlert.setDateTime(
						new DateTime(Long.parseLong(pointRecord.get("dateTime").toString()), DateTimeZone.UTC));
				pointInAreaAlert.setVesselType(Integer.parseInt(pointRecord.get("vesselType").toString()));
				pointInAreaAlert.setAreaId(areaRecord.get("id").toString());
				pointInAreaAlert.setAreaName(areaRecord.get("name").toString());

@@ -228,7 +241,8 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase
		System.out.format("%s: %s%n", requiredVariables.get(POINTS_TOPIC), pointsTopic);
		System.out.format("%s: %s%n", requiredVariables.get(RESULT_TOPIC), resultTopic);

		VesselRestrictionCheckerApplication app = new VesselRestrictionCheckerApplication(schemaRegistryUrl);
		VesselRestrictionCheckerApplication app = new VesselRestrictionCheckerApplication(
				new CachedSchemaRegistryClient(schemaRegistryUrl, 100), schemaRegistryUrl);

		System.out.format("Kafka streams starting...%n");
		System.out.format("BootstrapServers: %s, SchemaRegistry: %s, AppId: %s%n", bootstrapServers, schemaRegistryUrl,