Commit 73d0eb58 authored by Noel Alonso's avatar Noel Alonso
Browse files

Elimina referencias a eventos eliminados

parent 17b8640c
Loading
Loading
Loading
Loading
+0 −15
Original line number Diff line number Diff line
@@ -7,7 +7,6 @@ import es.redmic.vesselscommands.commands.vesseltracking.DeleteVesselTrackingCom
import es.redmic.vesselscommands.commands.vesseltracking.UpdateVesselTrackingCommand;
import es.redmic.vesselscommands.statestore.VesselTrackingStateStore;
import es.redmic.vesselslib.dto.tracking.VesselTrackingDTO;
import es.redmic.vesselslib.dto.tracking.VesselTrackingPropertiesDTO;
import es.redmic.vesselslib.dto.vessel.VesselDTO;
import es.redmic.vesselslib.events.vesseltracking.VesselTrackingEventTypes;
import es.redmic.vesselslib.events.vesseltracking.common.VesselTrackingEvent;
@@ -15,7 +14,6 @@ import es.redmic.vesselslib.events.vesseltracking.create.CreateVesselTrackingCan
import es.redmic.vesselslib.events.vesseltracking.create.EnrichCreateVesselTrackingEvent;
import es.redmic.vesselslib.events.vesseltracking.delete.DeleteVesselTrackingEvent;
import es.redmic.vesselslib.events.vesseltracking.delete.VesselTrackingDeletedEvent;
import es.redmic.vesselslib.events.vesseltracking.partialupdate.vessel.UpdateVesselInVesselTrackingEvent;
import es.redmic.vesselslib.events.vesseltracking.update.EnrichUpdateVesselTrackingEvent;

public class VesselTrackingAggregate extends Aggregate {
@@ -162,19 +160,6 @@ public class VesselTrackingAggregate extends Aggregate {
		super.apply(event);
	}

	public void apply(UpdateVesselInVesselTrackingEvent event) {
		if (this.vesselTracking == null)
			this.vesselTracking = new VesselTrackingDTO();

		VesselTrackingPropertiesDTO properties = new VesselTrackingPropertiesDTO();

		properties.setVessel(event.getVessel());

		this.vesselTracking.setProperties(properties);

		super.apply(event);
	}

	public void apply(VesselTrackingEvent event) {
		this.vesselTracking = event.getVesselTracking();
		super.apply(event);
+1 −51
Original line number Diff line number Diff line
package es.redmic.vesselscommands.streams;

import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

import es.redmic.brokerlib.alert.AlertService;
import es.redmic.brokerlib.avro.common.Event;
@@ -18,7 +16,6 @@ import es.redmic.vesselslib.events.vesseltracking.VesselTrackingEventFactory;
import es.redmic.vesselslib.events.vesseltracking.VesselTrackingEventTypes;
import es.redmic.vesselslib.events.vesseltracking.common.VesselTrackingEvent;
import es.redmic.vesselslib.events.vesseltracking.create.CreateVesselTrackingEnrichedEvent;
import es.redmic.vesselslib.events.vesseltracking.partialupdate.vessel.UpdateVesselInVesselTrackingEvent;
import es.redmic.vesselslib.events.vesseltracking.update.UpdateVesselTrackingEnrichedEvent;

public class VesselTrackingEventStreams extends EventSourcingStreams {
@@ -185,58 +182,11 @@ public class VesselTrackingEventStreams extends EventSourcingStreams {
	protected void processDeleteStream(KStream<String, Event> events) {
	}

	/**
	 * Función que a partir del último evento correcto + el evento de edición
	 * parcial + la confirmación de la vista, envía evento modificado.
	 */
	@Override
	protected void processPartialUpdatedStream(KStream<String, Event> vesselTrackingEvents,
			KStream<String, Event> updateConfirmedEvents) {

		// Stream filtrado por eventos de petición de modificar vessel en vesselTracking
		KStream<String, Event> updateRequestEvents = vesselTrackingEvents
				.filter((id, event) -> (VesselTrackingEventTypes.UPDATE_VESSEL.equals(event.getType())));

		// Join por id, mandando a kafka el evento de éxito
		KStream<String, UpdateVesselInVesselTrackingEvent> partialUpdateEvent = updateConfirmedEvents.join(
				updateRequestEvents,
				(confirmedEvent, requestEvent) -> isSameSession(confirmedEvent, requestEvent)
						? (UpdateVesselInVesselTrackingEvent) requestEvent
						: null,
				JoinWindows.of(windowsTime));

		// Stream filtrado por eventos de creaciones y modificaciones correctos (solo el
		// último que se produzca por id)
		KStream<String, Event> successEvents = vesselTrackingEvents
				.filter((id, event) -> (VesselTrackingEventTypes.CREATED.equals(event.getType())
						|| VesselTrackingEventTypes.UPDATED.equals(event.getType())));

		KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de compensación
		partialUpdateEvent.join(successEventsTable, (partialUpdateConfirmEvent,
				lastSuccessEvent) -> getUpdatedEventFromPartialUpdate(partialUpdateConfirmEvent, lastSuccessEvent))
				.to(topic);
	}

	/**
	 * Función que a partir del último evento correcto + el evento de edición
	 * parcial + la confirmación de la vista, si todo es correcto, genera evento
	 * updated
	 */

	private Event getUpdatedEventFromPartialUpdate(UpdateVesselInVesselTrackingEvent partialUpdateConfirmEvent,
			Event lastSuccessEvent) {

		assert VesselTrackingEventTypes.isSnapshot(lastSuccessEvent.getType());

		assert partialUpdateConfirmEvent.getType().equals(VesselTrackingEventTypes.UPDATE_VESSEL);

		VesselTrackingDTO vesselTracking = ((VesselTrackingEvent) lastSuccessEvent).getVesselTracking();
		vesselTracking.getProperties().setVessel(partialUpdateConfirmEvent.getVessel());

		return VesselTrackingEventFactory.getEvent(partialUpdateConfirmEvent, VesselTrackingEventTypes.UPDATED,
				vesselTracking);
		// En este caso no existe modificación parcial de vesselTracking vía stream
	}

	/**