Commit bc7729a1 authored by Noel Alonso's avatar Noel Alonso
Browse files

Hace uso de la factoría para generar los eventos

parent a9dfa031
Loading
Loading
Loading
Loading
+5 −10
Original line number Diff line number Diff line
@@ -23,6 +23,8 @@ import es.redmic.vesselscommands.config.UserService;
import es.redmic.vesselscommands.statestore.VesselStateStore;
import es.redmic.vesselscommands.streams.VesselEventStreams;
import es.redmic.vesselslib.dto.vessel.VesselDTO;
import es.redmic.vesselslib.events.vessel.VesselEventFactory;
import es.redmic.vesselslib.events.vessel.VesselEventTypes;
import es.redmic.vesselslib.events.vessel.create.CreateVesselCancelledEvent;
import es.redmic.vesselslib.events.vessel.create.CreateVesselEvent;
import es.redmic.vesselslib.events.vessel.create.CreateVesselFailedEvent;
@@ -234,9 +236,7 @@ public class VesselCommandHandler extends CommandHandler {
	@KafkaHandler
	private void listen(DeleteVesselConfirmedEvent event) {

		logger.info("Enviando evento VesselDeletedEvent para: " + event.getAggregateId());

		publishToKafka(new VesselDeletedEvent().buildFrom(event), vesselTopic);
		publishToKafka(VesselEventFactory.getEvent(event, VesselEventTypes.DELETED), vesselTopic);
	}

	@KafkaHandler
@@ -250,13 +250,8 @@ public class VesselCommandHandler extends CommandHandler {
	@KafkaHandler
	private void listen(CreateVesselFailedEvent event) {

		logger.info("Enviando evento CreateVesselCancelEvent para: " + event.getAggregateId());

		CreateVesselCancelledEvent evt = new CreateVesselCancelledEvent().buildFrom(event);
		evt.setExceptionType(event.getExceptionType());
		evt.setArguments(event.getArguments());

		publishToKafka(evt, vesselTopic);
		publishToKafka(VesselEventFactory.getEvent(event, VesselEventTypes.CREATE_CANCELLED, event.getExceptionType(),
				event.getArguments()), vesselTopic);
	}

	@KafkaHandler
+8 −21
Original line number Diff line number Diff line
@@ -23,6 +23,8 @@ import es.redmic.vesselscommands.config.UserService;
import es.redmic.vesselscommands.statestore.VesselTypeStateStore;
import es.redmic.vesselscommands.streams.VesselTypeEventStreams;
import es.redmic.vesselslib.dto.vesseltype.VesselTypeDTO;
import es.redmic.vesselslib.events.vesseltype.VesselTypeEventFactory;
import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes;
import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeCancelledEvent;
import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeEvent;
import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeFailedEvent;
@@ -32,7 +34,6 @@ import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCancelledEv
import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCheckFailedEvent;
import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCheckedEvent;
import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeConfirmedEvent;
import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeEvent;
import es.redmic.vesselslib.events.vesseltype.delete.VesselTypeDeletedEvent;
import es.redmic.vesselslib.events.vesseltype.update.UpdateVesselTypeCancelledEvent;
import es.redmic.vesselslib.events.vesseltype.update.UpdateVesselTypeEvent;
@@ -219,17 +220,13 @@ public class VesselTypeCommandHandler extends CommandHandler {
	@KafkaHandler
	private void listen(DeleteVesselTypeCheckedEvent event) {

		logger.info("Enviando evento DeleteVesselTypeEvent para: " + event.getAggregateId());

		publishToKafka(new DeleteVesselTypeEvent().buildFrom(event), vesselTypeTopic);
		publishToKafka(VesselTypeEventFactory.getEvent(event, VesselTypeEventTypes.DELETE), vesselTypeTopic);
	}

	@KafkaHandler
	private void listen(DeleteVesselTypeConfirmedEvent event) {

		logger.info("Enviando evento VesselTypeDeletedEvent para: " + event.getAggregateId());

		publishToKafka(new VesselTypeDeletedEvent().buildFrom(event), vesselTypeTopic);
		publishToKafka(VesselTypeEventFactory.getEvent(event, VesselTypeEventTypes.DELETED), vesselTypeTopic);
	}

	@KafkaHandler
@@ -243,13 +240,8 @@ public class VesselTypeCommandHandler extends CommandHandler {
	@KafkaHandler
	private void listen(CreateVesselTypeFailedEvent event) {

		logger.info("Enviando evento CreateVesselTypeCancelledEvent para: " + event.getAggregateId());

		CreateVesselTypeCancelledEvent evt = new CreateVesselTypeCancelledEvent().buildFrom(event);
		evt.setExceptionType(event.getExceptionType());
		evt.setArguments(event.getArguments());

		publishToKafka(evt, vesselTypeTopic);
		publishToKafka(VesselTypeEventFactory.getEvent(event, VesselTypeEventTypes.CREATE_CANCELLED,
				event.getExceptionType(), event.getArguments()), vesselTypeTopic);
	}

	@KafkaHandler
@@ -275,13 +267,8 @@ public class VesselTypeCommandHandler extends CommandHandler {
	@KafkaHandler
	private void listen(DeleteVesselTypeCheckFailedEvent event) {

		logger.info("Enviando evento DeleteVesselTypeCheckFailedEvent para: " + event.getAggregateId());

		DeleteVesselTypeCancelledEvent evt = new DeleteVesselTypeCancelledEvent().buildFrom(event);
		evt.setExceptionType(event.getExceptionType());
		evt.setArguments(event.getArguments());

		publishToKafka(evt, vesselTypeTopic);
		publishToKafka(VesselTypeEventFactory.getEvent(event, VesselTypeEventTypes.DELETE_CANCELLED,
				event.getExceptionType(), event.getArguments()), vesselTypeTopic);
	}

	@KafkaHandler
+27 −52
Original line number Diff line number Diff line
@@ -21,14 +21,11 @@ import es.redmic.commandslib.streaming.common.StreamConfig;
import es.redmic.commandslib.streaming.streams.EventSourcingStreams;
import es.redmic.vesselslib.dto.vessel.VesselDTO;
import es.redmic.vesselslib.dto.vesseltype.VesselTypeDTO;
import es.redmic.vesselslib.events.vessel.VesselEventFactory;
import es.redmic.vesselslib.events.vessel.VesselEventTypes;
import es.redmic.vesselslib.events.vessel.common.VesselEvent;
import es.redmic.vesselslib.events.vessel.create.VesselCreatedEvent;
import es.redmic.vesselslib.events.vessel.delete.DeleteVesselCancelledEvent;
import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.AggregationVesselTypeInVesselPostUpdateEvent;
import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.UpdateVesselTypeInVesselEvent;
import es.redmic.vesselslib.events.vessel.update.UpdateVesselCancelledEvent;
import es.redmic.vesselslib.events.vessel.update.VesselUpdatedEvent;
import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes;
import es.redmic.vesselslib.events.vesseltype.common.VesselTypeEvent;

@@ -42,6 +39,8 @@ public class VesselEventStreams extends EventSourcingStreams {

	private GlobalKTable<String, HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent>> aggByVesselType;

	private KStream<String, Event> vesselTypeEvents;

	public VesselEventStreams(StreamConfig config, String vesselTypeTopic, String vesselsAggByVesselTypeTopic,
			AlertService alertService) {
		super(config, alertService);
@@ -54,7 +53,7 @@ public class VesselEventStreams extends EventSourcingStreams {
	}

	/*
	 * Crea stream de vessels agregados por vesseltype
	 * Crea GlobalKTable de vessels agregados por vesseltype
	 * 
	 * @see es.redmic.commandslib.streaming.streams.EventSourcingStreams#
	 * createExtraStreams()
@@ -66,6 +65,8 @@ public class VesselEventStreams extends EventSourcingStreams {
		// vessels agregados por vesselType
		aggByVesselType = builder.globalTable(vesselsAggByVesselTypeTopic,
				Consumed.with(Serdes.String(), hashMapSerde));

		vesselTypeEvents = builder.stream(vesselTypeTopic);
	}

	/*
@@ -86,11 +87,7 @@ public class VesselEventStreams extends EventSourcingStreams {

		VesselDTO vessel = ((VesselEvent) requestEvent).getVessel();

		logger.info("Creando evento VesselCreatedEvent para: " + confirmedEvent.getAggregateId());

		VesselCreatedEvent successfulEvent = new VesselCreatedEvent().buildFrom(confirmedEvent);
		successfulEvent.setVessel(vessel);
		return successfulEvent;
		return VesselEventFactory.getEvent(confirmedEvent, VesselEventTypes.CREATED, vessel);
	}

	/*
@@ -111,11 +108,15 @@ public class VesselEventStreams extends EventSourcingStreams {

		VesselDTO vessel = ((VesselEvent) requestEvent).getVessel();

		logger.info("Creando evento VesselUpdatedEvent para: " + confirmedEvent.getAggregateId());
		return VesselEventFactory.getEvent(confirmedEvent, VesselEventTypes.UPDATED, vessel);
	}

		VesselUpdatedEvent successfulEvent = new VesselUpdatedEvent().buildFrom(confirmedEvent);
		successfulEvent.setVessel(vessel);
		return successfulEvent;
	/*
	 * Comprueba si vessel está referenciado en tracking para cancelar el borrado
	 */
	@Override
	protected void processDeleteStream(KStream<String, Event> events) {
		// TODO: Implementar en relación a tracking
	}

	/*
@@ -169,12 +170,7 @@ public class VesselEventStreams extends EventSourcingStreams {
		VesselDTO vessel = ((VesselEvent) lastSuccessEvent).getVessel();
		vessel.setType(partialUpdateConfirmEvent.getVesselType());

		logger.info("Creando evento VesselUpdatedEvent por una edición parcial de vesselType para: "
				+ partialUpdateConfirmEvent.getAggregateId());

		VesselUpdatedEvent successfulEvent = new VesselUpdatedEvent().buildFrom(partialUpdateConfirmEvent);
		successfulEvent.setVessel(vessel);
		return successfulEvent;
		return VesselEventFactory.getEvent(partialUpdateConfirmEvent, VesselEventTypes.UPDATED, vessel);
	}

	/*
@@ -194,16 +190,11 @@ public class VesselEventStreams extends EventSourcingStreams {

		EventError eventError = (EventError) failedEvent;

		logger.info("Enviando evento UpdateVesselCancelledEvent para: " + failedEvent.getAggregateId());

		alertService.errorAlert("UpdateVesselCancelledEvent para: " + failedEvent.getAggregateId(),
				eventError.getExceptionType() + " " + eventError.getArguments());

		UpdateVesselCancelledEvent cancelledEvent = new UpdateVesselCancelledEvent().buildFrom(failedEvent);
		cancelledEvent.setVessel(vessel);
		cancelledEvent.setExceptionType(eventError.getExceptionType());
		cancelledEvent.setArguments(eventError.getArguments());
		return cancelledEvent;
		return VesselEventFactory.getEvent(failedEvent, VesselEventTypes.UPDATE_CANCELLED, vessel,
				eventError.getExceptionType(), eventError.getArguments());
	}

	/*
@@ -223,13 +214,8 @@ public class VesselEventStreams extends EventSourcingStreams {

		EventError eventError = (EventError) failedEvent;

		logger.info("Enviando evento DeleteVesselCancelledEvent para: " + failedEvent.getAggregateId());

		DeleteVesselCancelledEvent cancelledEvent = new DeleteVesselCancelledEvent().buildFrom(failedEvent);
		cancelledEvent.setVessel(vessel);
		cancelledEvent.setExceptionType(eventError.getExceptionType());
		cancelledEvent.setArguments(eventError.getArguments());
		return cancelledEvent;
		return VesselEventFactory.getEvent(failedEvent, VesselEventTypes.DELETE_CANCELLED, vessel,
				eventError.getExceptionType(), eventError.getArguments());
	}

	/*
@@ -269,8 +255,6 @@ public class VesselEventStreams extends EventSourcingStreams {
	private void processVesselTypePostUpdate() {

		// Vesseltypes modificados
		KStream<String, Event> vesselTypeEvents = builder.stream(vesselTypeTopic);

		KStream<String, Event> updateReferenceEvents = vesselTypeEvents
				.filter((id, event) -> (VesselTypeEventTypes.UPDATED.equals(event.getType())));

@@ -296,11 +280,12 @@ public class VesselEventStreams extends EventSourcingStreams {

		ArrayList<UpdateVesselTypeInVesselEvent> result = new ArrayList<>();

		VesselTypeDTO vesselType = ((VesselTypeEvent) updateReferenceEvent).getVesselType();

		for (Map.Entry<String, AggregationVesselTypeInVesselPostUpdateEvent> entry : vesselWithReferenceEvents
				.entrySet()) {

			AggregationVesselTypeInVesselPostUpdateEvent aggregationEvent = entry.getValue();
			VesselTypeDTO vesselType = ((VesselTypeEvent) updateReferenceEvent).getVesselType();

			if (VesselEventTypes.isLocked(aggregationEvent.getType())) {

@@ -313,25 +298,15 @@ public class VesselEventStreams extends EventSourcingStreams {
					alertService.errorAlert(aggregationEvent.getAggregateId(), message);
				}

			} else {

				logger.debug("Creando evento de update para Vessel " + aggregationEvent.getAggregateId()
						+ " por cambio en vesselType");
			} else if (!aggregationEvent.getVesselType().equals(vesselType)) {

				if (!aggregationEvent.getVesselType().equals(vesselType)) {

					UpdateVesselTypeInVesselEvent updateVesselType = new UpdateVesselTypeInVesselEvent();
					updateVesselType.setAggregateId(aggregationEvent.getAggregateId());
					updateVesselType.setUserId(updateReferenceEvent.getUserId());
					updateVesselType.setVersion(aggregationEvent.getVersion() + 1);
					updateVesselType.setVesselType(((VesselTypeEvent) updateReferenceEvent).getVesselType());
					result.add(updateVesselType);
				result.add((UpdateVesselTypeInVesselEvent) VesselEventFactory.getEvent(aggregationEvent,
						updateReferenceEvent, VesselEventTypes.UPDATE_VESSELTYPE));

			} else {
				logger.debug("VesselType ya estaba actualizado o los campos indexados no han cambiado ");
			}
		}
		}
		return result;
	}
}
+12 −40
Original line number Diff line number Diff line
package es.redmic.vesselscommands.streams;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

import com.google.common.collect.ImmutableMap;

import es.redmic.brokerlib.alert.AlertService;
import es.redmic.brokerlib.avro.common.Event;
import es.redmic.brokerlib.avro.common.EventError;
@@ -18,14 +19,9 @@ import es.redmic.commandslib.streaming.common.StreamConfig;
import es.redmic.commandslib.streaming.streams.EventSourcingStreams;
import es.redmic.vesselslib.dto.vesseltype.VesselTypeDTO;
import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.AggregationVesselTypeInVesselPostUpdateEvent;
import es.redmic.vesselslib.events.vesseltype.VesselTypeEventFactory;
import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes;
import es.redmic.vesselslib.events.vesseltype.common.VesselTypeEvent;
import es.redmic.vesselslib.events.vesseltype.create.VesselTypeCreatedEvent;
import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCancelledEvent;
import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCheckFailedEvent;
import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCheckedEvent;
import es.redmic.vesselslib.events.vesseltype.update.UpdateVesselTypeCancelledEvent;
import es.redmic.vesselslib.events.vesseltype.update.VesselTypeUpdatedEvent;

public class VesselTypeEventStreams extends EventSourcingStreams {

@@ -73,11 +69,7 @@ public class VesselTypeEventStreams extends EventSourcingStreams {

		VesselTypeDTO vesselType = ((VesselTypeEvent) requestEvent).getVesselType();

		logger.info("Enviando evento VesselTypeCreatedEvent para: " + confirmedEvent.getAggregateId());

		VesselTypeCreatedEvent successfulEvent = new VesselTypeCreatedEvent().buildFrom(confirmedEvent);
		successfulEvent.setVesselType(vesselType);
		return successfulEvent;
		return VesselTypeEventFactory.getEvent(confirmedEvent, VesselTypeEventTypes.CREATED, vesselType);
	}

	/*
@@ -96,15 +88,9 @@ public class VesselTypeEventStreams extends EventSourcingStreams {
			return null;
		}

		logger.debug("Creando evento de modificado exitoso para VesselType");

		VesselTypeDTO vesselType = ((VesselTypeEvent) requestEvent).getVesselType();

		logger.info("Enviando evento VesselTypeUpdatedEvent para: " + confirmedEvent.getAggregateId());

		VesselTypeUpdatedEvent successfulEvent = new VesselTypeUpdatedEvent().buildFrom(confirmedEvent);
		successfulEvent.setVesselType(vesselType);
		return successfulEvent;
		return VesselTypeEventFactory.getEvent(requestEvent, VesselTypeEventTypes.UPDATED, vesselType);
	}

	/*
@@ -127,15 +113,11 @@ public class VesselTypeEventStreams extends EventSourcingStreams {

		if (vesselAggByVesselType == null || vesselAggByVesselType.isEmpty()) { // elemento no referenciado

			return new DeleteVesselTypeCheckedEvent().buildFrom(deleteEvent);
			return VesselTypeEventFactory.getEvent(deleteEvent, VesselTypeEventTypes.DELETE_CHECKED);
		} else { // elemento referenciado

			DeleteVesselTypeCheckFailedEvent evt = new DeleteVesselTypeCheckFailedEvent().buildFrom(deleteEvent);
			evt.setExceptionType(ExceptionType.ITEM_REFERENCED.toString());
			Map<String, String> arguments = new HashMap<>();
			arguments.put("id", deleteEvent.getAggregateId());
			evt.setArguments(arguments);
			return evt;
			return VesselTypeEventFactory.getEvent(deleteEvent, VesselTypeEventTypes.DELETE_CHECK_FAILED,
					ExceptionType.ITEM_REFERENCED.toString(), ImmutableMap.of("id", deleteEvent.getAggregateId()));
		}
	}

@@ -167,13 +149,8 @@ public class VesselTypeEventStreams extends EventSourcingStreams {

		EventError eventError = (EventError) failedEvent;

		logger.info("Enviando evento UpdateVesselTypeCancelledEvent para: " + failedEvent.getAggregateId());

		UpdateVesselTypeCancelledEvent cancelledEvent = new UpdateVesselTypeCancelledEvent().buildFrom(failedEvent);
		cancelledEvent.setVesselType(vesselType);
		cancelledEvent.setExceptionType(eventError.getExceptionType());
		cancelledEvent.setArguments(eventError.getArguments());
		return cancelledEvent;
		return VesselTypeEventFactory.getEvent(failedEvent, VesselTypeEventTypes.UPDATE_CANCELLED, vesselType,
				eventError.getExceptionType(), eventError.getArguments());
	}

	/*
@@ -193,13 +170,8 @@ public class VesselTypeEventStreams extends EventSourcingStreams {

		EventError eventError = (EventError) failedEvent;

		logger.info("Enviar evento DeleteVesselTypeCancelledEvent para: " + failedEvent.getAggregateId());

		DeleteVesselTypeCancelledEvent cancelledEvent = new DeleteVesselTypeCancelledEvent().buildFrom(failedEvent);
		cancelledEvent.setVesselType(vesselType);
		cancelledEvent.setExceptionType(eventError.getExceptionType());
		cancelledEvent.setArguments(eventError.getArguments());
		return cancelledEvent;
		return VesselTypeEventFactory.getEvent(failedEvent, VesselTypeEventTypes.DELETE_CANCELLED, vesselType,
				eventError.getExceptionType(), eventError.getArguments());
	}

	/*