Commit 5eb2eade authored by Noel Alonso's avatar Noel Alonso
Browse files

Cambia el join de confirmación de stream a table

De esta forma se garantiza que se realiza el join con la última petición
parent 7e73825c
Loading
Loading
Loading
Loading
+18 −13
Original line number Diff line number Diff line
@@ -2,7 +2,6 @@ package es.redmic.commandslib.streaming.streams;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

@@ -93,14 +92,17 @@ public abstract class EventSourcingStreams extends BaseStreams {
		KStream<String, Event> createConfirmedEvents = events
				.filter((id, event) -> (EventTypes.CREATE_CONFIRMED.equals(event.getType())));

		// Stream filtrado por eventos de petición de crear
		KStream<String, Event> createRequestEvents = events
				.filter((id, event) -> (EventTypes.CREATE.equals(event.getType())));
		// Table filtrado por eventos de petición de crear (Siempre el último
		// evento)
		KTable<String, Event> createRequestEvents = events
				.filter((id, event) -> (EventTypes.CREATE.equals(event.getType()))).groupByKey()
				.reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de éxito
		createConfirmedEvents.join(createRequestEvents,
				(confirmedEvent, requestEvent) -> getCreatedEvent(confirmedEvent, requestEvent),
				JoinWindows.of(windowsTime)).filter((k, v) -> (v != null)).to(topic);
		createConfirmedEvents
				.join(createRequestEvents,
						(confirmedEvent, requestEvent) -> getCreatedEvent(confirmedEvent, requestEvent))
				.filter((k, v) -> (v != null)).to(topic);
	}

	/*
@@ -130,14 +132,17 @@ public abstract class EventSourcingStreams extends BaseStreams {
		KStream<String, Event> updateConfirmedEvents = events
				.filter((id, event) -> (EventTypes.UPDATE_CONFIRMED.equals(event.getType())));

		// Stream filtrado por eventos de petición de modificar
		KStream<String, Event> updateRequestEvents = events
				.filter((id, event) -> (EventTypes.UPDATE.equals(event.getType())));
		// Table filtrado por eventos de petición de modificar (Siempre el último
		// evento)
		KTable<String, Event> updateRequestEvents = events
				.filter((id, event) -> (EventTypes.UPDATE.equals(event.getType()))).groupByKey()
				.reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de éxito
		updateConfirmedEvents.join(updateRequestEvents,
				(confirmedEvent, requestEvent) -> getUpdatedEvent(confirmedEvent, requestEvent),
				JoinWindows.of(windowsTime)).filter((k, v) -> (v != null)).to(topic);
		updateConfirmedEvents
				.join(updateRequestEvents,
						(confirmedEvent, requestEvent) -> getUpdatedEvent(confirmedEvent, requestEvent))
				.filter((k, v) -> (v != null)).to(topic);

		processPartialUpdatedStream(events, updateConfirmedEvents);
	}