Commit 986c5008 authored by Noel Alonso's avatar Noel Alonso
Browse files

Pasa funcionalidades comunes a la base

parent eb3450d2
Loading
Loading
Loading
Loading
+92 −117
Original line number Diff line number Diff line
@@ -5,6 +5,7 @@ import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
@@ -16,10 +17,10 @@ import es.redmic.brokerlib.alert.AlertService;
import es.redmic.brokerlib.avro.common.Event;
import es.redmic.brokerlib.avro.common.EventError;
import es.redmic.brokerlib.avro.serde.hashmap.HashMapSerde;
import es.redmic.commandslib.statestore.StreamConfig;
import es.redmic.commandslib.streams.EventStreams;
import es.redmic.vesselslib.dto.VesselDTO;
import es.redmic.vesselslib.dto.VesselTypeDTO;
import es.redmic.commandslib.streaming.common.StreamConfig;
import es.redmic.commandslib.streaming.streams.EventSourcingStreams;
import es.redmic.vesselslib.dto.vessel.VesselDTO;
import es.redmic.vesselslib.dto.vesseltype.VesselTypeDTO;
import es.redmic.vesselslib.events.vessel.VesselEventTypes;
import es.redmic.vesselslib.events.vessel.common.VesselEvent;
import es.redmic.vesselslib.events.vessel.create.VesselCreatedEvent;
@@ -31,38 +32,49 @@ import es.redmic.vesselslib.events.vessel.update.VesselUpdatedEvent;
import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes;
import es.redmic.vesselslib.events.vesseltype.common.VesselTypeEvent;

public class VesselEventStreams extends EventStreams {

	private String VESSELS_AGG_BY_VESSELTYPE_TOPIC = "vesselsAggByVesselType";
public class VesselEventStreams extends EventSourcingStreams {

	private String vesselTypeTopic;

	public VesselEventStreams(StreamConfig config, String vesselTypeTopic, AlertService alertService) {
	private String vesselsAggByVesselTypeTopic;

	private HashMapSerde<String, AggregationVesselTypeInVesselPostUpdateEvent> hashMapSerde;

	private GlobalKTable<String, HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent>> aggByVesselType;

	public VesselEventStreams(StreamConfig config, String vesselTypeTopic, String vesselsAggByVesselTypeTopic,
			AlertService alertService) {
		super(config, alertService);
		this.vesselTypeTopic = vesselTypeTopic;
		logger.info("Arrancado servicio de compensación de errores de edición y PostUpdate de Vessel con Id: "
				+ this.serviceId);
		this.vesselsAggByVesselTypeTopic = vesselsAggByVesselTypeTopic;
		this.hashMapSerde = new HashMapSerde<>(schemaRegistry);

		logger.info("Arrancado servicio de streaming para event sourcing de Vessel con Id: " + this.serviceId);
		init();
	}

	/*
	 * Crea stream de vessels agregados por vesseltype
	 * 
	 * @see es.redmic.commandslib.streaming.streams.EventSourcingStreams#
	 * createExtraStreams()
	 */
	@Override
	protected void processCreatedStream(KStream<String, Event> vesselEvents) {
	protected void createExtraStreams() {

		// Stream filtrado por eventos de confirmación al crear
		KStream<String, Event> createConfirmedEvents = vesselEvents
				.filter((id, event) -> (VesselEventTypes.CREATE_CONFIRMED.equals(event.getType())));

		// Stream filtrado por eventos de petición de crear
		KStream<String, Event> createRequestEvents = vesselEvents
				.filter((id, event) -> (VesselEventTypes.CREATE.equals(event.getType())));

		// Join por id, mandando a kafka el evento de éxito
		createConfirmedEvents.join(createRequestEvents,
				(confirmedEvent, requestEvent) -> getCreatedEvent(confirmedEvent, requestEvent),
				JoinWindows.of(windowsTime)).to(topic);
		// Crea un store global para procesar los datos de todas las instancias de
		// vessels agregados por vesselType
		aggByVesselType = builder.globalTable(vesselsAggByVesselTypeTopic,
				Consumed.with(Serdes.String(), hashMapSerde));
	}

	private Event getCreatedEvent(Event confirmedEvent, Event requestEvent) {
	/*
	 * Función que a partir del evento de confirmación de la vista y del evento
	 * create (petición de creación), si todo es correcto, genera evento created
	 */

	@Override
	protected Event getCreatedEvent(Event confirmedEvent, Event requestEvent) {

		assert requestEvent.getType().equals(VesselEventTypes.CREATE);

@@ -81,26 +93,13 @@ public class VesselEventStreams extends EventStreams {
		return successfulEvent;
	}

	@Override
	protected void processUpdatedStream(KStream<String, Event> vesselEvents) {

		// Stream filtrado por eventos de confirmación al modificar
		KStream<String, Event> updateConfirmedEvents = vesselEvents
				.filter((id, event) -> (VesselEventTypes.UPDATE_CONFIRMED.equals(event.getType())));
	/*
	 * Función que a partir del evento de confirmación de la vista y del evento
	 * update (petición de modificación), si todo es correcto, genera evento updated
	 */

		// Stream filtrado por eventos de petición de modificar
		KStream<String, Event> updateRequestEvents = vesselEvents
				.filter((id, event) -> (VesselEventTypes.UPDATE.equals(event.getType())));

		// Join por id, mandando a kafka el evento de éxito
		updateConfirmedEvents.join(updateRequestEvents,
				(confirmedEvent, requestEvent) -> getUpdatedEvent(confirmedEvent, requestEvent),
				JoinWindows.of(windowsTime)).to(topic);

		processPartialUpdatedStream(vesselEvents, updateConfirmedEvents);
	}

	private Event getUpdatedEvent(Event confirmedEvent, Event requestEvent) {
	@Override
	protected Event getUpdatedEvent(Event confirmedEvent, Event requestEvent) {

		assert requestEvent.getType().equals(VesselEventTypes.UPDATE);

@@ -119,7 +118,12 @@ public class VesselEventStreams extends EventStreams {
		return successfulEvent;
	}

	private void processPartialUpdatedStream(KStream<String, Event> vesselEvents,
	/*
	 * Función que a partir del último evento correcto + el evento de edición
	 * parcial + la confirmación de la vista, envía evento modificado.
	 */
	@Override
	protected void processPartialUpdatedStream(KStream<String, Event> vesselEvents,
			KStream<String, Event> updateConfirmedEvents) {

		// Stream filtrado por eventos de petición de modificar vesseltype
@@ -129,7 +133,9 @@ public class VesselEventStreams extends EventStreams {
		// Join por id, mandando a kafka el evento de éxito
		KStream<String, UpdateVesselTypeInVesselEvent> partialUpdateEvent = updateConfirmedEvents.join(
				updateRequestEvents,
				(confirmedEvent, requestEvent) -> checkConfirmPartialUpdate(confirmedEvent, requestEvent),
				(confirmedEvent, requestEvent) -> isSameSession(confirmedEvent, requestEvent)
						? (UpdateVesselTypeInVesselEvent) requestEvent
						: null,
				JoinWindows.of(windowsTime));

		// Stream filtrado por eventos de creaciones y modificaciones correctos (solo el
@@ -146,14 +152,11 @@ public class VesselEventStreams extends EventStreams {
				.to(topic);
	}

	// Comprueba si la confirmación corresponde con el evento enviado.
	private UpdateVesselTypeInVesselEvent checkConfirmPartialUpdate(Event confirmedEvent, Event requestEvent) {

		if (!isSameSession(confirmedEvent, requestEvent)) {
			return null;
		}
		return (UpdateVesselTypeInVesselEvent) requestEvent;
	}
	/*
	 * Función que a partir del último evento correcto + el evento de edición
	 * parcial + la confirmación de la vista, si todo es correcto, genera evento
	 * updated
	 */

	private Event getUpdatedEventFromPartialUpdate(UpdateVesselTypeInVesselEvent partialUpdateConfirmEvent,
			Event lastSuccessEvent) {
@@ -174,53 +177,13 @@ public class VesselEventStreams extends EventStreams {
		return successfulEvent;
	}

	@Override
	protected void processFailedChangeStream(KStream<String, Event> vesselEvents) {
	/*
	 * Función que a partir del evento fallido y el último evento correcto, genera
	 * evento UpdateCancelled
	 */

		// Stream filtrado por eventos de creaciones y modificaciones correctos (solo el
		// último que se produzca por id)
		KStream<String, Event> successEvents = vesselEvents
				.filter((id, event) -> (VesselEventTypes.CREATED.equals(event.getType())
						|| VesselEventTypes.UPDATED.equals(event.getType())));

		processUpdateFailedStream(vesselEvents, successEvents);

		processDeleteFailedStream(vesselEvents, successEvents);
	}

	protected void processUpdateFailedStream(KStream<String, Event> vesselEvents,
			KStream<String, Event> successEvents) {

		// Stream filtrado por eventos de fallo al modificar
		KStream<String, Event> failedEvents = vesselEvents
				.filter((id, event) -> (VesselEventTypes.UPDATE_FAILED.equals(event.getType())));

		KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de compensación
		failedEvents
				.join(successEventsTable,
						(failedEvent, lastSuccessEvent) -> getUpdateCancelledEvent(failedEvent, lastSuccessEvent))
				.to(topic);
	}

	protected void processDeleteFailedStream(KStream<String, Event> vesselEvents,
			KStream<String, Event> successEvents) {

		// Stream filtrado por eventos de fallo al borrar
		KStream<String, Event> failedEvents = vesselEvents
				.filter((id, event) -> (VesselEventTypes.DELETE_FAILED.equals(event.getType())));

		KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de compensación
		failedEvents
				.join(successEventsTable,
						(failedEvent, lastSuccessEvent) -> getDeleteCancelledEvent(failedEvent, lastSuccessEvent))
				.to(topic);
	}

	private Event getUpdateCancelledEvent(Event failedEvent, Event lastSuccessEvent) {
	@Override
	protected Event getUpdateCancelledEvent(Event failedEvent, Event lastSuccessEvent) {

		assert lastSuccessEvent.getType().equals(VesselEventTypes.CREATED)
				|| lastSuccessEvent.getType().equals(VesselEventTypes.UPDATED);
@@ -243,7 +206,13 @@ public class VesselEventStreams extends EventStreams {
		return cancelledEvent;
	}

	private Event getDeleteCancelledEvent(Event failedEvent, Event lastSuccessEvent) {
	/*
	 * Función que a partir del evento fallido y el último evento correcto, genera
	 * evento DeleteFailed
	 */

	@Override
	protected Event getDeleteCancelledEvent(Event failedEvent, Event lastSuccessEvent) {

		assert lastSuccessEvent.getType().equals(VesselEventTypes.CREATED)
				|| lastSuccessEvent.getType().equals(VesselEventTypes.UPDATED);
@@ -263,30 +232,41 @@ public class VesselEventStreams extends EventStreams {
		return cancelledEvent;
	}

	/*
	 * Función para procesar modificaciones de referencias
	 */

	@Override
	protected void processPostUpdateStream(KStream<String, Event> vesselEvents) {

		// Filtra eventos que contengan Vessel dentro que son los de comienzo y final
		// del ciclo, los agrega por vesseltype en un hashmap y los envia a un topic

		HashMapSerde<String, AggregationVesselTypeInVesselPostUpdateEvent> hashMapSerde = new HashMapSerde<>(
				schemaRegistry);

		KStream<String, Event> vesselEventsStream = vesselEvents.filter((id, event) -> {
			return (event instanceof VesselEvent);
		}).selectKey((k, v) -> getVesselTypeIdFromVessel(v));

		vesselEventsStream.groupByKey()
		// Para cada una de las referencias

		// Agregar por vesseltype
		aggregateVesselsByVesselType(vesselEventsStream);

		// processar los vesseltype modificados
		processVesselTypePostUpdate();
	}

	private String getVesselTypeIdFromVessel(Event evt) {

		return ((VesselEvent) evt).getVessel().getType().getId();
	}

	private void aggregateVesselsByVesselType(KStream<String, Event> vesselEventsStream) {

		vesselEventsStream.groupByKey()
				.aggregate(HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent>::new,
						(k, v, map) -> aggregateVesselsByVesselType(k, v, map),
						Materialized.with(Serdes.String(), hashMapSerde))
				.toStream().to(VESSELS_AGG_BY_VESSELTYPE_TOPIC, Produced.valueSerde(hashMapSerde));

		// Crea un store global para procesar los datos de todas las instancias
				.toStream().to(vesselsAggByVesselTypeTopic, Produced.with(Serdes.String(), hashMapSerde));
	}

		GlobalKTable<String, HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent>> aggByVesselType = builder
				.globalTable(VESSELS_AGG_BY_VESSELTYPE_TOPIC);
	private void processVesselTypePostUpdate() {

		// Vesseltypes modificados
		KStream<String, Event> vesselTypeEvents = builder.stream(vesselTypeTopic);
@@ -303,11 +283,6 @@ public class VesselEventStreams extends EventStreams {
		join.flatMapValues(value -> value).selectKey((k, v) -> v.getAggregateId()).to(topic);
	}

	private String getVesselTypeIdFromVessel(Event evt) {

		return ((VesselEvent) evt).getVessel().getType().getId();
	}

	private HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent> aggregateVesselsByVesselType(String key,
			Event value, HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent> hashMap) {

+49 −78
Original line number Diff line number Diff line
package es.redmic.vesselscommands.streams;

import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

import es.redmic.brokerlib.alert.AlertService;
import es.redmic.brokerlib.avro.common.Event;
@@ -25,24 +23,26 @@ public class VesselTypeEventStreams extends EventSourcingStreams {
		init();
	}

	@Override
	protected void processCreatedStream(KStream<String, Event> vesselTypeEvents) {

		// Stream filtrado por eventos de confirmación al crear
		KStream<String, Event> createConfirmedEvents = vesselTypeEvents
				.filter((id, event) -> (VesselTypeEventTypes.CREATE_CONFIRMED.toString().equals(event.getType())));
	/*
	 * (non-Javadoc)
	 * 
	 * @see es.redmic.commandslib.streaming.streams.EventSourcingStreams#
	 * createExtraStreams()
	 */

		// Stream filtrado por eventos de petición de crear
		KStream<String, Event> createRequestEvents = vesselTypeEvents
				.filter((id, event) -> (VesselTypeEventTypes.CREATE.toString().equals(event.getType())));
	@Override
	protected void createExtraStreams() {
		// No existen streams extra necesarios

		// Join por id, mandando a kafka el evento de éxito
		createConfirmedEvents.join(createRequestEvents,
				(confirmedEvent, requestEvent) -> getCreatedEvent(confirmedEvent, requestEvent),
				JoinWindows.of(windowsTime)).to(topic);
	}

	private Event getCreatedEvent(Event confirmedEvent, Event requestEvent) {
	/*
	 * Función que apartir del evento de confirmación de la vista y del evento
	 * create (petición de creación), si todo es correcto, genera evento created
	 */

	@Override
	protected Event getCreatedEvent(Event confirmedEvent, Event requestEvent) {

		assert requestEvent.getType().equals(VesselTypeEventTypes.CREATE);

@@ -61,24 +61,13 @@ public class VesselTypeEventStreams extends EventSourcingStreams {
		return successfulEvent;
	}

	@Override
	protected void processUpdatedStream(KStream<String, Event> vesselTypeEvents) {

		// Stream filtrado por eventos de confirmación al modificar
		KStream<String, Event> updateConfirmedEvents = vesselTypeEvents
				.filter((id, event) -> (VesselTypeEventTypes.UPDATE_CONFIRMED.toString().equals(event.getType())));

		// Stream filtrado por eventos de petición de modificar
		KStream<String, Event> updateRequestEvents = vesselTypeEvents
				.filter((id, event) -> (VesselTypeEventTypes.UPDATE.toString().equals(event.getType())));
	/*
	 * Función que apartir del evento de confirmación de la vista y del evento
	 * update (petición de modificación), si todo es correcto, genera evento updated
	 */

		// Join por id, mandando a kafka el evento de éxito
		updateConfirmedEvents.join(updateRequestEvents,
				(confirmedEvent, requestEvent) -> getUpdatedEvent(confirmedEvent, requestEvent),
				JoinWindows.of(windowsTime)).to(topic);
	}

	private Event getUpdatedEvent(Event confirmedEvent, Event requestEvent) {
	@Override
	protected Event getUpdatedEvent(Event confirmedEvent, Event requestEvent) {

		assert requestEvent.getType().equals(VesselTypeEventTypes.UPDATE);

@@ -99,54 +88,24 @@ public class VesselTypeEventStreams extends EventSourcingStreams {
		return successfulEvent;
	}

	@Override
	protected void processFailedChangeStream(KStream<String, Event> vesselTypeEvents) {

		// Stream filtrado por eventos de creaciones y modificaciones correctos (solo el
		// último que se produzca por id)
		KStream<String, Event> successEvents = vesselTypeEvents
				.filter((id, event) -> (VesselTypeEventTypes.CREATED.toString().equals(event.getType())
						|| VesselTypeEventTypes.UPDATED.toString().equals(event.getType())));

		processUpdateFailedStream(vesselTypeEvents, successEvents);

		processDeleteFailedStream(vesselTypeEvents, successEvents);

	}

	protected void processUpdateFailedStream(KStream<String, Event> vesselTypeEvents,
			KStream<String, Event> successEvents) {

		// Stream filtrado por eventos de fallo al modificar
		KStream<String, Event> failedEvents = vesselTypeEvents
				.filter((id, event) -> (VesselTypeEventTypes.UPDATE_FAILED.toString().equals(event.getType())));
	/*
	 * Función que a partir del último evento correcto + el evento de edición
	 * parcial + la confirmación de la vista, si todo es correcto, genera evento
	 * updated
	 */

		KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de compensación
		failedEvents
				.join(successEventsTable,
						(failedEvent, lastSuccessEvent) -> getUpdateCancelledEvent(failedEvent, lastSuccessEvent))
				.to(topic);
	@Override
	protected void processPartialUpdatedStream(KStream<String, Event> vesselTypeEvents,
			KStream<String, Event> updateConfirmedEvents) {
	}

	protected void processDeleteFailedStream(KStream<String, Event> vesselTypeEvents,
			KStream<String, Event> successEvents) {

		// Stream filtrado por eventos de fallo al borrar
		KStream<String, Event> failedEvents = vesselTypeEvents
				.filter((id, event) -> (VesselTypeEventTypes.DELETE_FAILED.toString().equals(event.getType())));

		KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue);
	/*
	 * Función que a partir del evento fallido y el último evento correcto, genera
	 * evento UpdateCancelled
	 */

		// Join por id, mandando a kafka el evento de compensación
		failedEvents
				.join(successEventsTable,
						(failedEvent, lastSuccessEvent) -> getDeleteCancelledEvent(failedEvent, lastSuccessEvent))
				.to(topic);
	}

	private Event getUpdateCancelledEvent(Event failedEvent, Event lastSuccessEvent) {
	@Override
	protected Event getUpdateCancelledEvent(Event failedEvent, Event lastSuccessEvent) {

		assert failedEvent.getType().equals(VesselTypeEventTypes.UPDATE_FAILED);

@@ -166,7 +125,13 @@ public class VesselTypeEventStreams extends EventSourcingStreams {
		return cancelledEvent;
	}

	private Event getDeleteCancelledEvent(Event failedEvent, Event lastSuccessEvent) {
	/*
	 * Función que a partir del evento fallido y el último evento correcto, genera
	 * evento DeleteFailed
	 */

	@Override
	protected Event getDeleteCancelledEvent(Event failedEvent, Event lastSuccessEvent) {

		assert failedEvent.getType().equals(VesselTypeEventTypes.DELETE_FAILED);

@@ -186,7 +151,13 @@ public class VesselTypeEventStreams extends EventSourcingStreams {
		return cancelledEvent;
	}

	/*
	 * Función para procesar modificaciones de referencias
	 */

	@Override
	protected void processPostUpdateStream(KStream<String, Event> events) {

		// En este caso no hay modificación de relaciones
	}
}