Commit 7e71c92b authored by Noel Alonso's avatar Noel Alonso
Browse files

Cambia el join de confirmación de stream a table

Garantizando que se realiza el join con la última petición solamente
parent 7365f93f
Loading
Loading
Loading
Loading
+7 −18
Original line number Diff line number Diff line
@@ -4,13 +4,9 @@ import java.util.HashMap;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import es.redmic.brokerlib.alert.AlertService;
import es.redmic.brokerlib.avro.common.Event;
import es.redmic.brokerlib.avro.common.EventError;
@@ -252,17 +248,19 @@ public class VesselEventStreams extends EventSourcingStreams {
	protected void processPartialUpdatedStream(KStream<String, Event> vesselEvents,
			KStream<String, Event> updateConfirmedEvents) {

		// Stream filtrado por eventos de petición de modificar vesseltype
		KStream<String, Event> updateRequestEvents = vesselEvents
				.filter((id, event) -> (VesselEventTypes.UPDATE_VESSELTYPE.equals(event.getType())));
		// Table filtrado por eventos de petición de modificar vesseltype (Siempre el
		// último
		// evento)
		KTable<String, Event> updateRequestEvents = vesselEvents
				.filter((id, event) -> (VesselEventTypes.UPDATE_VESSELTYPE.equals(event.getType()))).groupByKey()
				.reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de éxito
		KStream<String, UpdateVesselTypeInVesselEvent> partialUpdateEvent = updateConfirmedEvents.join(
				updateRequestEvents,
				(confirmedEvent, requestEvent) -> isSameSession(confirmedEvent, requestEvent)
						? (UpdateVesselTypeInVesselEvent) requestEvent
						: null,
				JoinWindows.of(windowsTime));
						: null);

		// Stream filtrado por eventos de creaciones y modificaciones correctos (solo el
		// último que se produzca por id)
@@ -464,15 +462,6 @@ public class VesselEventStreams extends EventSourcingStreams {
			if (currentVesselDTO != null && !currentVesselDTO.equals(vesselDTO)) {
				logger.info("Modificando barco vía stream " + currentVesselDTO.getId());

				ObjectMapper mapper = new ObjectMapper();
				try {
					logger.info("current " + mapper.writeValueAsString(currentVesselDTO));
					logger.info("new " + mapper.writeValueAsString(vesselDTO));
				} catch (JsonProcessingException e) {

					e.printStackTrace();
				}

				return getEnrichUpdateVesselEventFromRealtimeVessel(vesselDTO, vesselEvent);
			}
		}