Commit 9854c8f6 authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade comprobación antes de borrar vessel

De este modo, si está referenciado, no se borra el dato
parent 1fd0f761
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -12,7 +12,7 @@ import es.redmic.vesselslib.events.vessel.common.VesselEvent;
import es.redmic.vesselslib.events.vessel.create.CreateVesselCancelledEvent;
import es.redmic.vesselslib.events.vessel.create.CreateVesselEvent;
import es.redmic.vesselslib.events.vessel.create.EnrichCreateVesselEvent;
import es.redmic.vesselslib.events.vessel.delete.DeleteVesselEvent;
import es.redmic.vesselslib.events.vessel.delete.CheckDeleteVesselEvent;
import es.redmic.vesselslib.events.vessel.delete.VesselDeletedEvent;
import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.UpdateVesselTypeInVesselEvent;
import es.redmic.vesselslib.events.vessel.update.EnrichUpdateVesselEvent;
@@ -84,7 +84,7 @@ public class VesselAggregate extends Aggregate {
		return evt;
	}

	public DeleteVesselEvent process(DeleteVesselCommand cmd) {
	public CheckDeleteVesselEvent process(DeleteVesselCommand cmd) {

		assert vesselStateStore != null;

@@ -96,7 +96,7 @@ public class VesselAggregate extends Aggregate {

		checkState(id, state.getType());

		DeleteVesselEvent evt = new DeleteVesselEvent();
		CheckDeleteVesselEvent evt = new CheckDeleteVesselEvent();
		evt.setAggregateId(id);
		evt.setVersion(getVersion() + 1);
		return evt;
+22 −3
Original line number Diff line number Diff line
@@ -30,9 +30,11 @@ import es.redmic.vesselslib.events.vessel.create.CreateVesselCancelledEvent;
import es.redmic.vesselslib.events.vessel.create.CreateVesselEnrichedEvent;
import es.redmic.vesselslib.events.vessel.create.CreateVesselFailedEvent;
import es.redmic.vesselslib.events.vessel.create.VesselCreatedEvent;
import es.redmic.vesselslib.events.vessel.delete.CheckDeleteVesselEvent;
import es.redmic.vesselslib.events.vessel.delete.DeleteVesselCancelledEvent;
import es.redmic.vesselslib.events.vessel.delete.DeleteVesselCheckFailedEvent;
import es.redmic.vesselslib.events.vessel.delete.DeleteVesselCheckedEvent;
import es.redmic.vesselslib.events.vessel.delete.DeleteVesselConfirmedEvent;
import es.redmic.vesselslib.events.vessel.delete.DeleteVesselEvent;
import es.redmic.vesselslib.events.vessel.delete.VesselDeletedEvent;
import es.redmic.vesselslib.events.vessel.update.UpdateVesselCancelledEvent;
import es.redmic.vesselslib.events.vessel.update.UpdateVesselEnrichedEvent;
@@ -54,6 +56,9 @@ public class VesselCommandHandler extends CommandHandler {
	@Value("${broker.topic.vessel.updated}")
	private String vesselUpdatedTopic;

	@Value("${broker.topic.tracking.agg.by.vessel}")
	private String vesselTrackingAggByVesselTopic;

	@Value("${broker.topic.vessel.type.updated}")
	private String vesselTypeUpdatedTopic;

@@ -111,7 +116,8 @@ public class VesselCommandHandler extends CommandHandler {
				config
					.serviceId(vesselsEventsStreamId)
					.windowsTime(streamWindowsTime)
					.build(), vesselTypeTopic, vesselsAggByVesselTypeTopic, vesselTypeUpdatedTopic, alertService);
					.build(), vesselTypeTopic, vesselsAggByVesselTypeTopic,
						vesselTypeUpdatedTopic, vesselTrackingAggByVesselTopic, alertService);
		// @formatter:on
	}

@@ -180,7 +186,7 @@ public class VesselCommandHandler extends CommandHandler {
		agg.setAggregateId(id);

		// Se procesa el comando, obteniendo el evento generado
		DeleteVesselEvent event = agg.process(cmd);
		CheckDeleteVesselEvent event = agg.process(cmd);

		// Si no se genera evento significa que no se va a aplicar
		if (event == null)
@@ -237,6 +243,12 @@ public class VesselCommandHandler extends CommandHandler {
		resolveCommand(event.getSessionId());
	}

	@KafkaHandler
	private void listen(DeleteVesselCheckedEvent event) {

		publishToKafka(VesselEventFactory.getEvent(event, VesselEventTypes.DELETE), vesselTopic);
	}

	@KafkaHandler
	private void listen(DeleteVesselConfirmedEvent event) {

@@ -278,6 +290,13 @@ public class VesselCommandHandler extends CommandHandler {
				ExceptionFactory.getException(event.getExceptionType(), event.getArguments()));
	}

	@KafkaHandler
	private void listen(DeleteVesselCheckFailedEvent event) {

		publishToKafka(VesselEventFactory.getEvent(event, VesselEventTypes.DELETE_CANCELLED, event.getExceptionType(),
				event.getArguments()), vesselTopic);
	}

	@KafkaHandler
	private void listen(DeleteVesselCancelledEvent event) {

+46 −7
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@ import es.redmic.brokerlib.avro.common.Event;
import es.redmic.brokerlib.avro.common.EventError;
import es.redmic.brokerlib.avro.common.EventTypes;
import es.redmic.brokerlib.avro.serde.hashmap.HashMapSerde;
import es.redmic.commandslib.exceptions.ExceptionType;
import es.redmic.commandslib.streaming.common.StreamConfig;
import es.redmic.commandslib.streaming.streams.EventSourcingStreams;
import es.redmic.vesselslib.dto.vessel.VesselDTO;
@@ -29,6 +30,7 @@ import es.redmic.vesselslib.events.vessel.create.CreateVesselEnrichedEvent;
import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.AggregationVesselTypeInVesselPostUpdateEvent;
import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.UpdateVesselTypeInVesselEvent;
import es.redmic.vesselslib.events.vessel.update.UpdateVesselEnrichedEvent;
import es.redmic.vesselslib.events.vesseltracking.partialupdate.vessel.AggregationVesselInVesselTrackingPostUpdateEvent;
import es.redmic.vesselslib.events.vesseltype.common.VesselTypeEvent;

public class VesselEventStreams extends EventSourcingStreams {
@@ -39,7 +41,9 @@ public class VesselEventStreams extends EventSourcingStreams {

	private String vesselTypeUpdatedTopic;

	private HashMapSerde<String, AggregationVesselTypeInVesselPostUpdateEvent> hashMapSerde;
	private String vesselTrackingAggByVesselTopic;

	private HashMapSerde<String, AggregationVesselTypeInVesselPostUpdateEvent> hashMapSerdeAggregationVesselTypeInVessel;

	private GlobalKTable<String, HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent>> aggByVesselType;

@@ -47,13 +51,19 @@ public class VesselEventStreams extends EventSourcingStreams {

	private KStream<String, Event> vesselTypeEvents;

	private HashMapSerde<String, AggregationVesselInVesselTrackingPostUpdateEvent> hashMapSerdeAggregationVesselInVesselTracking;

	private KTable<String, HashMap<String, AggregationVesselInVesselTrackingPostUpdateEvent>> aggByVessel;

	public VesselEventStreams(StreamConfig config, String vesselTypeTopic, String vesselsAggByVesselTypeTopic,
			String vesselTypeUpdatedTopic, AlertService alertService) {
			String vesselTypeUpdatedTopic, String vesselTrackingAggByVesselTopic, AlertService alertService) {
		super(config, alertService);
		this.vesselTypeTopic = vesselTypeTopic;
		this.vesselsAggByVesselTypeTopic = vesselsAggByVesselTypeTopic;
		this.vesselTypeUpdatedTopic = vesselTypeUpdatedTopic;
		this.hashMapSerde = new HashMapSerde<>(schemaRegistry);
		this.vesselTrackingAggByVesselTopic = vesselTrackingAggByVesselTopic;
		this.hashMapSerdeAggregationVesselTypeInVessel = new HashMapSerde<>(schemaRegistry);
		this.hashMapSerdeAggregationVesselInVesselTracking = new HashMapSerde<>(schemaRegistry);

		logger.info("Arrancado servicio de streaming para event sourcing de Vessel con Id: " + this.serviceId);
		init();
@@ -71,11 +81,14 @@ public class VesselEventStreams extends EventSourcingStreams {
		// Crea un store global para procesar los datos de todas las instancias de
		// vessels agregados por vesselType
		aggByVesselType = builder.globalTable(vesselsAggByVesselTypeTopic,
				Consumed.with(Serdes.String(), hashMapSerde));
				Consumed.with(Serdes.String(), hashMapSerdeAggregationVesselTypeInVessel));

		vesselType = builder.globalTable(vesselTypeTopic);

		vesselTypeEvents = builder.stream(vesselTypeUpdatedTopic);

		aggByVessel = builder.table(vesselTrackingAggByVesselTopic,
				Consumed.with(Serdes.String(), hashMapSerdeAggregationVesselInVesselTracking));
	}

	/*
@@ -185,7 +198,32 @@ public class VesselEventStreams extends EventSourcingStreams {
	 */
	@Override
	protected void processDeleteStream(KStream<String, Event> events) {
		// TODO: Implementar en relación a tracking
		// Stream filtrado por eventos de borrado
		KStream<String, Event> deleteEvents = events
				.filter((id, event) -> (EventTypes.CHECK_DELETE.equals(event.getType())));

		deleteEvents
				.leftJoin(aggByVessel, (deleteEvent,
						vesselTrackingAggByVessel) -> getCheckDeleteResultEvent(deleteEvent, vesselTrackingAggByVessel))
				.to(topic);
	}

	@SuppressWarnings("serial")
	private Event getCheckDeleteResultEvent(Event deleteEvent,
			HashMap<String, AggregationVesselInVesselTrackingPostUpdateEvent> vesselTrackingAggByVessel) {

		if (vesselTrackingAggByVessel == null || vesselTrackingAggByVessel.isEmpty()) { // elemento no referenciado

			return VesselEventFactory.getEvent(deleteEvent, VesselEventTypes.DELETE_CHECKED);
		} else { // elemento referenciado

			return VesselEventFactory.getEvent(deleteEvent, VesselEventTypes.DELETE_CHECK_FAILED,
					ExceptionType.ITEM_REFERENCED.toString(), new HashMap<String, String>() {
						{
							put("id", deleteEvent.getAggregateId());
						}
					});
		}
	}

	/*
@@ -317,8 +355,9 @@ public class VesselEventStreams extends EventSourcingStreams {
		vesselEventsStream.groupByKey()
				.aggregate(HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent>::new,
						(k, v, map) -> aggregateVesselsByVesselType(k, v, map),
						Materialized.with(Serdes.String(), hashMapSerde))
				.toStream().to(vesselsAggByVesselTypeTopic, Produced.with(Serdes.String(), hashMapSerde));
						Materialized.with(Serdes.String(), hashMapSerdeAggregationVesselTypeInVessel))
				.toStream().to(vesselsAggByVesselTypeTopic,
						Produced.with(Serdes.String(), hashMapSerdeAggregationVesselTypeInVessel));
	}

	private void processVesselTypePostUpdate() {