Commit 686df2d8 authored by Noel Alonso's avatar Noel Alonso
Browse files

Cambia la forma de agregar en stream postUpdate

De esta forma en lugar de usar una lista, se usa un hashmap, almancenando solo
los últimos eventos de cada clave.
parent 924a2633
Loading
Loading
Loading
Loading
+26 −24
Original line number Diff line number Diff line
package es.redmic.vesselscommands.streams;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.JoinWindows;
@@ -11,7 +13,7 @@ import org.apache.kafka.streams.kstream.Materialized;
import es.redmic.brokerlib.alert.AlertService;
import es.redmic.brokerlib.avro.common.Event;
import es.redmic.brokerlib.avro.common.EventError;
import es.redmic.brokerlib.avro.serde.ArrayListSerde;
import es.redmic.brokerlib.avro.serde.hashmap.HashMapSerde;
import es.redmic.commandslib.statestore.StreamConfig;
import es.redmic.commandslib.streams.EventStreams;
import es.redmic.vesselslib.dto.VesselDTO;
@@ -202,25 +204,15 @@ public class VesselEventStreams extends EventStreams {
	@Override
	protected void processPostUpdateStream(KStream<String, Event> vesselEvents) {

		// Stream filtrado por eventos de creaciones y modificaciones correctos (solo el
		// último que se produzca por id)
		KStream<String, Event> vesselEventsStream = vesselEvents.filter((id, event) -> {
			String type = event.getType();
			return (VesselEventType.VESSEL_CREATED.toString().equals(type)
					|| VesselEventType.VESSEL_UPDATED.toString().equals(type)
					|| VesselEventType.CREATE_VESSEL.toString().equals(type)
					|| VesselEventType.UPDATE_VESSEL.toString().equals(type));
		});

		KStream<String, Event> vesselEventsStreamByTypeId = vesselEventsStream
		KStream<String, Event> vesselEventsStreamByTypeId = vesselEvents
				.selectKey((k, v) -> ((VesselEvent) v).getVessel().getType().getId());

		KTable<String, ArrayList<VesselEvent>> vesselEventsTable = vesselEventsStreamByTypeId.groupByKey()
				.aggregate(ArrayList::new, (key, value, list) -> {
					// Añade a list cada uno de los values
					list.add((VesselEvent) value);
					return list;
				}, Materialized.with(Serdes.String(), new ArrayListSerde<>(schemaRegistry)));
		KTable<String, HashMap<String, VesselEvent>> vesselEventsTable = vesselEventsStreamByTypeId.groupByKey()
				.aggregate(HashMap::new, (key, value, hashMap) -> {
					// Añade a hashmap cada uno de los values
					hashMap.put(value.getAggregateId(), (VesselEvent) value);
					return hashMap;
				}, Materialized.with(Serdes.String(), new HashMapSerde<>(schemaRegistry)));

		// Vesseltypes modificados
		KStream<String, Event> vesselTypeEvents = builder.stream(vesselTypeTopic);
@@ -237,19 +229,19 @@ public class VesselEventStreams extends EventStreams {
	}

	private ArrayList<VesselEvent> getPostUpdateEvent(Event updateReferenceEvent,
			ArrayList<VesselEvent> vesselWithReferenceEvents) {
			HashMap<String, VesselEvent> vesselWithReferenceEvents) {

		ArrayList<VesselEvent> result = new ArrayList<>();

		for (VesselEvent vesselEvent : vesselWithReferenceEvents) {
		for (Map.Entry<String, VesselEvent> entry : vesselWithReferenceEvents.entrySet()) {

			VesselEvent vesselEvent = entry.getValue();
			VesselTypeDTO vesselType = ((VesselTypeEvent) updateReferenceEvent).getVesselType();

			if (VesselEventType.CREATE_VESSEL.toString().equals(vesselEvent.getType())
					|| VesselEventType.UPDATE_VESSEL.toString().equals(vesselEvent.getType())) {
			if (!isEventCompleted(vesselEvent.getType())) {

				String message = "Item con id " + vesselEvent.getAggregateId()
						+ " en proceso de creación, por lo que no se modificó la referencia "
						+ " se encuentra en mitad de un ciclo de creación o edición, por lo que no se modificó la referencia "
						+ updateReferenceEvent.getAggregateId();

				logger.info(message);
@@ -272,10 +264,20 @@ public class VesselEventStreams extends EventStreams {
					updateVesselEvent.setVessel(vessel);
					result.add(updateVesselEvent);
				} else {
					System.out.println("dentro");
					logger.debug("VesselType ya estaba actualizado o los campos indexados no han cambiado ");
				}
			}
		}
		return result;
	}

	private boolean isEventCompleted(String type) {

		return (VesselEventType.VESSEL_CREATED.toString().equals(type)
				|| VesselEventType.VESSEL_UPDATED.toString().equals(type)
				|| VesselEventType.VESSEL_DELETED.toString().equals(type)
				|| VesselEventType.CREATE_VESSEL_CANCELLED.toString().equals(type)
				|| VesselEventType.UPDATE_VESSEL_CANCELLED.toString().equals(type)
				|| VesselEventType.DELETE_VESSEL_CANCELLED.toString().equals(type));
	}
}