Commit 6327f904 authored by Noel Alonso's avatar Noel Alonso
Browse files

Implementa funcionalidades comunes de los streams

Todos los streams de event sourcing comparten funcionalidades que son
pasadas a la base
parent 647e01d8
Loading
Loading
Loading
Loading
+148 −3
Original line number Diff line number Diff line
@@ -2,10 +2,13 @@ package es.redmic.commandslib.streaming.streams;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

import es.redmic.brokerlib.alert.AlertService;
import es.redmic.brokerlib.avro.common.Event;
import es.redmic.brokerlib.avro.common.EventTypes;
import es.redmic.commandslib.streaming.common.BaseStreams;
import es.redmic.commandslib.streaming.common.StreamConfig;
import es.redmic.commandslib.streaming.common.StreamUtils;
@@ -21,6 +24,8 @@ public abstract class EventSourcingStreams extends BaseStreams {
	@Override
	protected KafkaStreams processStreams() {

		createExtraStreams();

		KStream<String, Event> events = builder.stream(topic);

		// Create Success
@@ -39,11 +44,151 @@ public abstract class EventSourcingStreams extends BaseStreams {
				StreamUtils.baseStreamsConfig(bootstrapServers, stateStoreDir, serviceId, schemaRegistry));
	}

	protected abstract void processCreatedStream(KStream<String, Event> events);
	/*
	 * Función para crear streams extra que sean necesarios y específicos de cada
	 * tipo
	 */
	protected abstract void createExtraStreams();

	/*
	 * Función que a partir de los eventos de crear y confirmación de la vista,
	 * envía evento creado
	 */

	protected void processCreatedStream(KStream<String, Event> events) {

		// Stream filtrado por eventos de confirmación al crear
		KStream<String, Event> createConfirmedEvents = events
				.filter((id, event) -> (EventTypes.CREATE_CONFIRMED.equals(event.getType())));

		// Stream filtrado por eventos de petición de crear
		KStream<String, Event> createRequestEvents = events
				.filter((id, event) -> (EventTypes.CREATE.equals(event.getType())));

		// Join por id, mandando a kafka el evento de éxito
		createConfirmedEvents.join(createRequestEvents,
				(confirmedEvent, requestEvent) -> getCreatedEvent(confirmedEvent, requestEvent),
				JoinWindows.of(windowsTime)).to(topic);
	}

	/*
	 * Función que a partir del evento de confirmación de la vista y del evento
	 * create (petición de creación), si todo es correcto, genera evento created
	 */

	protected abstract Event getCreatedEvent(Event confirmedEvent, Event requestEvent);

	/*
	 * Función que a partir del evento modificar y la confirmación de la vista,
	 * envía evento modificado. Además a partir del flujo de eventos de confirmación
	 * de la vista, manda a procesar las ediciones parciales
	 */

	protected void processUpdatedStream(KStream<String, Event> events) {

		// Stream filtrado por eventos de confirmación al modificar
		KStream<String, Event> updateConfirmedEvents = events
				.filter((id, event) -> (EventTypes.UPDATE_CONFIRMED.equals(event.getType())));

		// Stream filtrado por eventos de petición de modificar
		KStream<String, Event> updateRequestEvents = events
				.filter((id, event) -> (EventTypes.UPDATE.equals(event.getType())));

		// Join por id, mandando a kafka el evento de éxito
		updateConfirmedEvents.join(updateRequestEvents,
				(confirmedEvent, requestEvent) -> getUpdatedEvent(confirmedEvent, requestEvent),
				JoinWindows.of(windowsTime)).to(topic);

		processPartialUpdatedStream(events, updateConfirmedEvents);
	}

	/*
	 * Función que a partir del evento de confirmación de la vista y del evento
	 * update (petición de modificación), si todo es correcto, genera evento updated
	 */

	protected abstract Event getUpdatedEvent(Event confirmedEvent, Event requestEvent);

	/*
	 * Función que a partir del último evento correcto + el evento de edición
	 * parcial + la confirmación de la vista, envía evento modificado.
	 */

	protected abstract void processPartialUpdatedStream(KStream<String, Event> vesselEvents,
			KStream<String, Event> updateConfirmedEvents);

	/*
	 * Función que procesa los eventos fallidos
	 */
	protected void processFailedChangeStream(KStream<String, Event> events) {

		// Stream filtrado por eventos de creaciones y modificaciones correctos (solo el
		// último que se produzca por id)
		KStream<String, Event> successEvents = events.filter((id,
				event) -> (EventTypes.CREATED.equals(event.getType()) || EventTypes.UPDATED.equals(event.getType())));

		processUpdateFailedStream(events, successEvents);

		processDeleteFailedStream(events, successEvents);
	}

	/*
	 * Función que a partir del último evento correcto y el evento fallido al
	 * editar, envía evento de cancelación
	 */

	protected void processUpdateFailedStream(KStream<String, Event> events, KStream<String, Event> successEvents) {

		// Stream filtrado por eventos de fallo al modificar
		KStream<String, Event> failedEvents = events
				.filter((id, event) -> (EventTypes.UPDATE_FAILED.equals(event.getType())));

		KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de compensación
		failedEvents
				.join(successEventsTable,
						(failedEvent, lastSuccessEvent) -> getUpdateCancelledEvent(failedEvent, lastSuccessEvent))
				.to(topic);
	}

	/*
	 * Función que a partir del evento fallido y el último evento correcto, genera
	 * evento UpdateCancelled
	 */

	protected abstract Event getUpdateCancelledEvent(Event failedEvent, Event lastSuccessEvent);

	/*
	 * Función que a partir del último evento correcto y el evento fallido al
	 * eliminar, envía evento de cancelación
	 */

	protected void processDeleteFailedStream(KStream<String, Event> events, KStream<String, Event> successEvents) {

		// Stream filtrado por eventos de fallo al borrar
		KStream<String, Event> failedEvents = events
				.filter((id, event) -> (EventTypes.DELETE_FAILED.equals(event.getType())));

		KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de compensación
		failedEvents
				.join(successEventsTable,
						(failedEvent, lastSuccessEvent) -> getDeleteCancelledEvent(failedEvent, lastSuccessEvent))
				.to(topic);
	}

	/*
	 * Función que a partir del evento fallido y el último evento correcto, genera
	 * evento DeleteFailed
	 */

	protected abstract void processUpdatedStream(KStream<String, Event> events);
	protected abstract Event getDeleteCancelledEvent(Event failedEvent, Event lastSuccessEvent);

	protected abstract void processFailedChangeStream(KStream<String, Event> events);
	/*
	 * Función para procesar modificaciones de referencias
	 */

	protected abstract void processPostUpdateStream(KStream<String, Event> events);