Commit 6dbdaab0 authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade streaming para reenviar eventos a snapshot

parent 0f32eb3f
Loading
Loading
Loading
Loading
+20 −11
Original line number Diff line number Diff line
@@ -69,7 +69,7 @@ public class VesselEventStreams extends EventSourcingStreams {
		init();
	}

	/*
	/**
	 * Crea GlobalKTable de vessels agregados por vesseltype
	 * 
	 * @see es.redmic.commandslib.streaming.streams.EventSourcingStreams#
@@ -91,7 +91,16 @@ public class VesselEventStreams extends EventSourcingStreams {
				Consumed.with(Serdes.String(), hashMapSerdeAggregationVesselInVesselTracking));
	}

	/*
	/**
	 * Reenvía eventos finales a topic de snapshot
	 */
	@Override
	protected void forwardSnapshotEvents(KStream<String, Event> events) {

		events.filter((id, event) -> (VesselEventTypes.isSnapshot(event.getType()))).to(snapshotTopic);
	}

	/**
	 * Función que a partir de los eventos de tipo CreateEnrich y globalKTable de
	 * las relaciones, enriquece el item antes de mandarlo a crear
	 * 
@@ -121,7 +130,7 @@ public class VesselEventStreams extends EventSourcingStreams {
		return event;
	}

	/*
	/**
	 * Función que a partir del evento de confirmación de la vista y del evento
	 * create (petición de creación), si todo es correcto, genera evento created
	 */
@@ -172,7 +181,7 @@ public class VesselEventStreams extends EventSourcingStreams {
		return event;
	}

	/*
	/**
	 * Función que a partir del evento de confirmación de la vista y del evento
	 * update (petición de modificación), si todo es correcto, genera evento updated
	 */
@@ -193,7 +202,7 @@ public class VesselEventStreams extends EventSourcingStreams {
		return VesselEventFactory.getEvent(confirmedEvent, VesselEventTypes.UPDATED, vessel);
	}

	/*
	/**
	 * Comprueba si vessel está referenciado en tracking para cancelar el borrado
	 */
	@Override
@@ -226,7 +235,7 @@ public class VesselEventStreams extends EventSourcingStreams {
		}
	}

	/*
	/**
	 * Función que a partir del último evento correcto + el evento de edición
	 * parcial + la confirmación de la vista, envía evento modificado.
	 */
@@ -260,7 +269,7 @@ public class VesselEventStreams extends EventSourcingStreams {
				.to(topic);
	}

	/*
	/**
	 * Función que a partir del último evento correcto + el evento de edición
	 * parcial + la confirmación de la vista, si todo es correcto, genera evento
	 * updated
@@ -280,7 +289,7 @@ public class VesselEventStreams extends EventSourcingStreams {
		return VesselEventFactory.getEvent(partialUpdateConfirmEvent, VesselEventTypes.UPDATED, vessel);
	}

	/*
	/**
	 * Función que a partir del evento fallido y el último evento correcto, genera
	 * evento UpdateCancelled
	 */
@@ -303,7 +312,7 @@ public class VesselEventStreams extends EventSourcingStreams {
				eventError.getExceptionType(), eventError.getArguments());
	}

	/*
	/**
	 * Función que a partir del evento fallido y el último evento correcto, genera
	 * evento DeleteFailed
	 */
@@ -329,7 +338,7 @@ public class VesselEventStreams extends EventSourcingStreams {
		return VesselEventTypes.isSnapshot(eventType);
	}

	/*
	/**
	 * Función para procesar modificaciones de referencias
	 */

+21 −12
Original line number Diff line number Diff line
@@ -59,7 +59,7 @@ public class VesselTrackingEventStreams extends EventSourcingStreams {
		init();
	}

	/*
	/**
	 * Crea GlobalKTable de vesselTracking agregados por vessel
	 * 
	 * @see es.redmic.commandslib.streaming.streams.EventSourcingStreams#
@@ -77,7 +77,16 @@ public class VesselTrackingEventStreams extends EventSourcingStreams {
		vesselEvents = builder.stream(vesselUpdatedTopic);
	}

	/*
	/**
	 * Reenvía eventos finales a topic de snapshot
	 */
	@Override
	protected void forwardSnapshotEvents(KStream<String, Event> events) {

		events.filter((id, event) -> (VesselTrackingEventTypes.isSnapshot(event.getType()))).to(snapshotTopic);
	}

	/**
	 * Función que a partir de los eventos de tipo CreateEnrich y globalKTable de
	 * las relaciones, enriquece el item antes de mandarlo a crear
	 * 
@@ -110,7 +119,7 @@ public class VesselTrackingEventStreams extends EventSourcingStreams {
		return event;
	}

	/*
	/**
	 * Función que a partir del evento de confirmación de la vista y del evento
	 * create (petición de creación), si todo es correcto, genera evento created
	 */
@@ -131,7 +140,7 @@ public class VesselTrackingEventStreams extends EventSourcingStreams {
		return VesselTrackingEventFactory.getEvent(confirmedEvent, VesselTrackingEventTypes.CREATED, vesselTracking);
	}

	/*
	/**
	 * Función que a partir de los eventos de tipo UpdateEnrich y globalKTable de
	 * las relaciones, enriquece el item antes de mandarlo a modificar
	 * 
@@ -163,7 +172,7 @@ public class VesselTrackingEventStreams extends EventSourcingStreams {
		return event;
	}

	/*
	/**
	 * Función que a partir del evento de confirmación de la vista y del evento
	 * update (petición de modificación), si todo es correcto, genera evento updated
	 */
@@ -184,14 +193,14 @@ public class VesselTrackingEventStreams extends EventSourcingStreams {
		return VesselTrackingEventFactory.getEvent(confirmedEvent, VesselTrackingEventTypes.UPDATED, vesselTracking);
	}

	/*
	/**
	 * Comprueba si vessel tracking está referenciado en otro servicio
	 */
	@Override
	protected void processDeleteStream(KStream<String, Event> events) {
	}

	/*
	/**
	 * Función que a partir del último evento correcto + el evento de edición
	 * parcial + la confirmación de la vista, envía evento modificado.
	 */
@@ -225,7 +234,7 @@ public class VesselTrackingEventStreams extends EventSourcingStreams {
				.to(topic);
	}

	/*
	/**
	 * Función que a partir del último evento correcto + el evento de edición
	 * parcial + la confirmación de la vista, si todo es correcto, genera evento
	 * updated
@@ -245,7 +254,7 @@ public class VesselTrackingEventStreams extends EventSourcingStreams {
				vesselTracking);
	}

	/*
	/**
	 * Función que a partir del evento fallido y el último evento correcto, genera
	 * evento UpdateCancelled
	 */
@@ -268,7 +277,7 @@ public class VesselTrackingEventStreams extends EventSourcingStreams {
				vesselTracking, eventError.getExceptionType(), eventError.getArguments());
	}

	/*
	/**
	 * Función que a partir del evento fallido y el último evento correcto, genera
	 * evento DeleteFailed
	 */
@@ -294,7 +303,7 @@ public class VesselTrackingEventStreams extends EventSourcingStreams {
		return VesselTrackingEventTypes.isSnapshot(eventType);
	}

	/*
	/**
	 * Función para procesar modificaciones de referencias
	 */

+17 −8
Original line number Diff line number Diff line
@@ -37,7 +37,7 @@ public class VesselTypeEventStreams extends EventSourcingStreams {
		init();
	}

	/*
	/**
	 * Crea KTable de vessels agregados por vesseltype
	 * 
	 * @see es.redmic.commandslib.streaming.streams.EventSourcingStreams#
@@ -49,7 +49,16 @@ public class VesselTypeEventStreams extends EventSourcingStreams {
		aggByVesselType = builder.table(vesselsAggByVesselTypeTopic, Consumed.with(Serdes.String(), hashMapSerde));
	}

	/*
	/**
	 * Reenvía eventos finales a topic de snapshot
	 */
	@Override
	protected void forwardSnapshotEvents(KStream<String, Event> events) {

		events.filter((id, event) -> (VesselTypeEventTypes.isSnapshot(event.getType()))).to(snapshotTopic);
	}

	/**
	 * Función que apartir del evento de confirmación de la vista y del evento
	 * create (petición de creación), si todo es correcto, genera evento created
	 */
@@ -70,7 +79,7 @@ public class VesselTypeEventStreams extends EventSourcingStreams {
		return VesselTypeEventFactory.getEvent(confirmedEvent, VesselTypeEventTypes.CREATED, vesselType);
	}

	/*
	/**
	 * Función que apartir del evento de confirmación de la vista y del evento
	 * update (petición de modificación), si todo es correcto, genera evento updated
	 */
@@ -91,7 +100,7 @@ public class VesselTypeEventStreams extends EventSourcingStreams {
		return VesselTypeEventFactory.getEvent(requestEvent, VesselTypeEventTypes.UPDATED, vesselType);
	}

	/*
	/**
	 * Comprueba si vesselType está referenciado en vessel para cancelar el borrado
	 */

@@ -125,7 +134,7 @@ public class VesselTypeEventStreams extends EventSourcingStreams {
		}
	}

	/*
	/**
	 * Función que a partir del evento fallido y el último evento correcto, genera
	 * evento UpdateCancelled
	 */
@@ -145,7 +154,7 @@ public class VesselTypeEventStreams extends EventSourcingStreams {
				eventError.getExceptionType(), eventError.getArguments());
	}

	/*
	/**
	 * Función que a partir del evento fallido y el último evento correcto, genera
	 * evento DeleteFailed
	 */
@@ -187,7 +196,7 @@ public class VesselTypeEventStreams extends EventSourcingStreams {
		// En este caso no hay modificaciones parciales
	}

	/*
	/**
	 * Función para procesar modificaciones de referencias
	 */