Commit 50c96230 authored by Noel Alonso's avatar Noel Alonso
Browse files

Envía la agregación por vesseltype a un topic

De este modo se puede usar un globalKTable para procesar
todos los postupdate independientemente de la instancia
parent e6777713
Loading
Loading
Loading
Loading
+36 −15
Original line number Diff line number Diff line
@@ -5,10 +5,12 @@ import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

import es.redmic.brokerlib.alert.AlertService;
import es.redmic.brokerlib.avro.common.Event;
@@ -31,6 +33,8 @@ import es.redmic.vesselslib.events.vesseltype.common.VesselTypeEvent;

public class VesselEventStreams extends EventStreams {

	private String VESSELS_AGG_BY_VESSELTYPE_TOPIC = "vesselsAggByVesselType";

	private String vesselTypeTopic;

	public VesselEventStreams(StreamConfig config, String vesselTypeTopic, AlertService alertService) {
@@ -262,24 +266,27 @@ public class VesselEventStreams extends EventStreams {
	@Override
	protected void processPostUpdateStream(KStream<String, Event> vesselEvents) {

		// Stream filtrado por eventos que contengan Vessel dentro que son los de
		// comienzo y final
		// del ciclo
		// Filtra eventos que contengan Vessel dentro que son los de comienzo y final
		// del ciclo, los agrega por vesseltype en un hashmap y los envia a un topic

		HashMapSerde<String, AggregationVesselTypeInVesselPostUpdateEvent> hashMapSerde = new HashMapSerde<>(
				schemaRegistry);

		KStream<String, Event> vesselEventsStream = vesselEvents.filter((id, event) -> {
			return (event instanceof VesselEvent);
		});
		}).selectKey((k, v) -> getVesselTypeIdFromVessel(v));

		KStream<String, Event> vesselEventsStreamByTypeId = vesselEventsStream
				.selectKey((k, v) -> ((VesselEvent) v).getVessel().getType().getId());
		vesselEventsStream.groupByKey()

		KTable<String, HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent>> vesselEventsTable = vesselEventsStreamByTypeId
				.groupByKey().aggregate(HashMap::new, (key, value, hashMap) -> {
					// Añade a hashmap cada uno de los values
					hashMap.put(value.getAggregateId(),
							new AggregationVesselTypeInVesselPostUpdateEvent(value.getType(),
									((VesselEvent) value).getVessel().getType()).buildFrom(value));
					return hashMap;
				}, Materialized.with(Serdes.String(), new HashMapSerde<>(schemaRegistry)));
				.aggregate(HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent>::new,
						(k, v, map) -> aggregateVesselsByVesselType(k, v, map),
						Materialized.with(Serdes.String(), hashMapSerde))
				.toStream().to(VESSELS_AGG_BY_VESSELTYPE_TOPIC, Produced.valueSerde(hashMapSerde));

		// Crea un store global para procesar los datos de todas las instancias

		GlobalKTable<String, HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent>> aggByVesselType = builder
				.globalTable(VESSELS_AGG_BY_VESSELTYPE_TOPIC);

		// Vesseltypes modificados
		KStream<String, Event> vesselTypeEvents = builder.stream(vesselTypeTopic);
@@ -287,7 +294,8 @@ public class VesselEventStreams extends EventStreams {
		KStream<String, Event> updateReferenceEvents = vesselTypeEvents
				.filter((id, event) -> (VesselTypeEventTypes.UPDATED.equals(event.getType())));

		KStream<String, ArrayList<UpdateVesselTypeInVesselEvent>> join = updateReferenceEvents.join(vesselEventsTable,
		KStream<String, ArrayList<UpdateVesselTypeInVesselEvent>> join = updateReferenceEvents.join(aggByVesselType,
				(k, v) -> k,
				(updateReferenceEvent, vesselWithReferenceEvents) -> getPostUpdateEvent(updateReferenceEvent,
						vesselWithReferenceEvents));

@@ -295,6 +303,19 @@ public class VesselEventStreams extends EventStreams {
		join.flatMapValues(value -> value).selectKey((k, v) -> v.getAggregateId()).to(topic);
	}

	private String getVesselTypeIdFromVessel(Event evt) {

		return ((VesselEvent) evt).getVessel().getType().getId();
	}

	private HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent> aggregateVesselsByVesselType(String key,
			Event value, HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent> hashMap) {

		hashMap.put(value.getAggregateId(), new AggregationVesselTypeInVesselPostUpdateEvent(value.getType(),
				((VesselEvent) value).getVessel().getType()).buildFrom(value));
		return hashMap;
	}

	private ArrayList<UpdateVesselTypeInVesselEvent> getPostUpdateEvent(Event updateReferenceEvent,
			HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent> vesselWithReferenceEvents) {