Commit 41e2e50a authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade enriquecimiento de vessel vía eventsourcing

Envía vesseltype modificados a un topic intermedio
Modifica tests
parent 6e8827c6
Loading
Loading
Loading
Loading
+27 −21
Original line number Diff line number Diff line
@@ -2,7 +2,6 @@ package es.redmic.vesselscommands.aggregate;

import es.redmic.brokerlib.avro.common.Event;
import es.redmic.commandslib.aggregate.Aggregate;
import es.redmic.exception.database.DBNotFoundException;
import es.redmic.vesselscommands.commands.vessel.CreateVesselCommand;
import es.redmic.vesselscommands.commands.vessel.DeleteVesselCommand;
import es.redmic.vesselscommands.commands.vessel.UpdateVesselCommand;
@@ -12,9 +11,11 @@ import es.redmic.vesselslib.events.vessel.VesselEventTypes;
import es.redmic.vesselslib.events.vessel.common.VesselEvent;
import es.redmic.vesselslib.events.vessel.create.CreateVesselCancelledEvent;
import es.redmic.vesselslib.events.vessel.create.CreateVesselEvent;
import es.redmic.vesselslib.events.vessel.create.EnrichCreateVesselEvent;
import es.redmic.vesselslib.events.vessel.delete.DeleteVesselEvent;
import es.redmic.vesselslib.events.vessel.delete.VesselDeletedEvent;
import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.UpdateVesselTypeInVesselEvent;
import es.redmic.vesselslib.events.vessel.update.EnrichUpdateVesselEvent;
import es.redmic.vesselslib.events.vessel.update.UpdateVesselEvent;

public class VesselAggregate extends Aggregate {
@@ -27,7 +28,7 @@ public class VesselAggregate extends Aggregate {
		this.vesselStateStore = vesselStateStore;
	}

	public CreateVesselEvent process(CreateVesselCommand cmd) {
	public VesselEvent process(CreateVesselCommand cmd) {

		assert vesselStateStore != null;

@@ -39,13 +40,23 @@ public class VesselAggregate extends Aggregate {
		}
		this.setAggregateId(id);

		CreateVesselEvent evt = new CreateVesselEvent(cmd.getVessel());
		VesselEvent evt = null;

		if (cmd.getVessel().getType() != null) {

			logger.info("Creando evento para enriquecer Vessel");
			evt = new EnrichCreateVesselEvent(cmd.getVessel());
		} else {
			logger.info("Creando evento para crear Vessel. Enriquecimiento descartado");
			evt = new CreateVesselEvent(cmd.getVessel());
		}

		evt.setAggregateId(id);
		evt.setVersion(1);
		return evt;
	}

	public UpdateVesselEvent process(UpdateVesselCommand cmd) {
	public VesselEvent process(UpdateVesselCommand cmd) {

		assert vesselStateStore != null;

@@ -57,9 +68,19 @@ public class VesselAggregate extends Aggregate {

		checkState(id, state.getType());

		UpdateVesselEvent evt = new UpdateVesselEvent(cmd.getVessel());
		VesselEvent evt = null;

		if (cmd.getVessel().getType() != null) {

			logger.info("Creando evento para enriquecer Vessel");
			evt = new EnrichUpdateVesselEvent(cmd.getVessel());
		} else {
			logger.info("Creando evento para modificar Vessel. Enriquecimiento descartado");
			evt = new UpdateVesselEvent(cmd.getVessel());
		}

		evt.setAggregateId(id);
		evt.setVersion(getVersion() + 1);
		evt.setVersion(2);
		return evt;
	}

@@ -85,21 +106,6 @@ public class VesselAggregate extends Aggregate {
		return vessel;
	}

	public VesselDTO getVesselFromStateStore(VesselDTO type) {

		CreateVesselCommand cmd = new CreateVesselCommand(type);

		Event state = getItemFromStateStore(cmd.getVessel().getId());

		if (state == null) {
			throw new DBNotFoundException("id", cmd.getVessel().getId());
		}

		loadFromHistory(state);

		return getVessel();
	}

	@Override
	protected boolean isLocked(String eventType) {

+0 −16
Original line number Diff line number Diff line
@@ -2,7 +2,6 @@ package es.redmic.vesselscommands.aggregate;

import es.redmic.brokerlib.avro.common.Event;
import es.redmic.commandslib.aggregate.Aggregate;
import es.redmic.exception.database.DBNotFoundException;
import es.redmic.vesselscommands.commands.vesseltype.CreateVesselTypeCommand;
import es.redmic.vesselscommands.commands.vesseltype.DeleteVesselTypeCommand;
import es.redmic.vesselscommands.commands.vesseltype.UpdateVesselTypeCommand;
@@ -87,21 +86,6 @@ public class VesselTypeAggregate extends Aggregate {
		return vesselType;
	}

	public VesselTypeDTO getVesselTypeFromStateStore(VesselTypeDTO type) {

		CreateVesselTypeCommand cmd = new CreateVesselTypeCommand(type);

		Event state = vesselTypeStateStore.getVesselType(cmd.getVesselType().getId());

		if (state == null) {
			throw new DBNotFoundException("id", cmd.getVesselType().getId());
		}

		loadFromHistory(state);

		return getVesselType();
	}

	@Override
	protected boolean isLocked(String eventType) {

+6 −0
Original line number Diff line number Diff line
@@ -4,6 +4,7 @@ import org.joda.time.DateTime;

import es.redmic.commandslib.commands.Command;
import es.redmic.exception.databinding.FieldNotValidException;
import es.redmic.vesselscommands.commands.vesseltype.CreateVesselTypeCommand;
import es.redmic.vesselslib.dto.vessel.VesselDTO;

public class CreateVesselCommand extends Command {
@@ -28,6 +29,11 @@ public class CreateVesselCommand extends Command {
		vessel.setInserted(DateTime.now());
		vessel.setUpdated(DateTime.now());

		// Se añade id generado a vesselType para poder buscarlo
		if (vessel.getType() != null && vessel.getType().getId() == null) {
			vessel.getType().setId(new CreateVesselTypeCommand(vessel.getType()).getVesselType().getId());
		}

		this.setVessel(vessel);
	}

+7 −0
Original line number Diff line number Diff line
@@ -3,6 +3,7 @@ package es.redmic.vesselscommands.commands.vessel;
import org.joda.time.DateTime;

import es.redmic.commandslib.commands.Command;
import es.redmic.vesselscommands.commands.vesseltype.CreateVesselTypeCommand;
import es.redmic.vesselslib.dto.vessel.VesselDTO;

public class UpdateVesselCommand extends Command {
@@ -15,6 +16,12 @@ public class UpdateVesselCommand extends Command {
	public UpdateVesselCommand(VesselDTO vessel) {

		vessel.setUpdated(DateTime.now());

		// Se añade id generado a vesselType para poder buscarlo
		if (vessel.getType() != null && vessel.getType().getId() == null) {
			vessel.getType().setId(new CreateVesselTypeCommand(vessel.getType()).getVesselType().getId());
		}

		this.setVessel(vessel);
	}

+18 −21
Original line number Diff line number Diff line
@@ -25,8 +25,9 @@ import es.redmic.vesselscommands.streams.VesselEventStreams;
import es.redmic.vesselslib.dto.vessel.VesselDTO;
import es.redmic.vesselslib.events.vessel.VesselEventFactory;
import es.redmic.vesselslib.events.vessel.VesselEventTypes;
import es.redmic.vesselslib.events.vessel.common.VesselEvent;
import es.redmic.vesselslib.events.vessel.create.CreateVesselCancelledEvent;
import es.redmic.vesselslib.events.vessel.create.CreateVesselEvent;
import es.redmic.vesselslib.events.vessel.create.CreateVesselEnrichedEvent;
import es.redmic.vesselslib.events.vessel.create.CreateVesselFailedEvent;
import es.redmic.vesselslib.events.vessel.create.VesselCreatedEvent;
import es.redmic.vesselslib.events.vessel.delete.DeleteVesselCancelledEvent;
@@ -34,7 +35,7 @@ import es.redmic.vesselslib.events.vessel.delete.DeleteVesselConfirmedEvent;
import es.redmic.vesselslib.events.vessel.delete.DeleteVesselEvent;
import es.redmic.vesselslib.events.vessel.delete.VesselDeletedEvent;
import es.redmic.vesselslib.events.vessel.update.UpdateVesselCancelledEvent;
import es.redmic.vesselslib.events.vessel.update.UpdateVesselEvent;
import es.redmic.vesselslib.events.vessel.update.UpdateVesselEnrichedEvent;
import es.redmic.vesselslib.events.vessel.update.VesselUpdatedEvent;

@Component
@@ -50,6 +51,9 @@ public class VesselCommandHandler extends CommandHandler {
	@Value("${broker.topic.vessel}")
	private String vesselTopic;

	@Value("${broker.topic.vessel.type.updated}")
	private String vesselTypeUpdatedTopic;

	@Value("${broker.topic.vessels.agg.by.vesseltype}")
	private String vesselsAggByVesselTypeTopic;

@@ -75,9 +79,6 @@ public class VesselCommandHandler extends CommandHandler {

	private VesselStateStore vesselStateStore;

	@Autowired
	VesselTypeCommandHandler vesselTypeCommandHandler;

	@Autowired
	UserService userService;

@@ -107,7 +108,7 @@ public class VesselCommandHandler extends CommandHandler {
				config
					.serviceId(vesselsEventsStreamId)
					.windowsTime(streamWindowsTime)
					.build(), vesselTypeTopic, vesselsAggByVesselTypeTopic, alertService);
					.build(), vesselTypeTopic, vesselsAggByVesselTypeTopic, vesselTypeUpdatedTopic, alertService);
		// @formatter:on
	}

@@ -117,14 +118,8 @@ public class VesselCommandHandler extends CommandHandler {

		logger.debug("Procesando CreateVesselCommand");

		// Rellena el vesselType. Si es null lo descartamos
		if (cmd.getVessel().getType() != null)
			cmd.getVessel().setType(vesselTypeCommandHandler.getVesselType(cmd.getVessel().getType()));
		else // TODO: enviar correo a operador de datos.
			logger.warn("Vessel con id " + cmd.getVessel().getId() + " no tiene definido el tipo.");

		// Se procesa el comando, obteniendo el evento generado
		CreateVesselEvent event = agg.process(cmd);
		VesselEvent event = agg.process(cmd);

		// Si no se genera evento significa que no se debe aplicar
		if (event == null) {
@@ -154,11 +149,8 @@ public class VesselCommandHandler extends CommandHandler {

		VesselAggregate agg = new VesselAggregate(vesselStateStore);

		// El vesselType es obligado
		cmd.getVessel().setType(vesselTypeCommandHandler.getVesselType(cmd.getVessel().getType()));

		// Se procesa el comando, obteniendo el evento generado
		UpdateVesselEvent event = agg.process(cmd);
		VesselEvent event = agg.process(cmd);

		// Si no se genera evento significa que no se va a aplicar
		if (event == null)
@@ -206,11 +198,10 @@ public class VesselCommandHandler extends CommandHandler {
		return getResult(event.getSessionId(), completableFuture);
	}

	public VesselDTO getVessel(VesselDTO type) {

		VesselAggregate vesselAggregate = new VesselAggregate(vesselStateStore);
	@KafkaHandler
	private void listen(CreateVesselEnrichedEvent event) {

		return vesselAggregate.getVesselFromStateStore(type);
		publishToKafka(VesselEventFactory.getEvent(event, VesselEventTypes.CREATE, event.getVessel()), vesselTopic);
	}

	@KafkaHandler
@@ -223,6 +214,12 @@ public class VesselCommandHandler extends CommandHandler {
		resolveCommand(event.getSessionId());
	}

	@KafkaHandler
	private void listen(UpdateVesselEnrichedEvent event) {

		publishToKafka(VesselEventFactory.getEvent(event, VesselEventTypes.UPDATE, event.getVessel()), vesselTopic);
	}

	@KafkaHandler
	private void listen(VesselUpdatedEvent event) {

Loading