Loading vessels-commands/src/main/java/es/redmic/vesselscommands/aggregate/VesselTypeAggregate.java +4 −3 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes; import es.redmic.vesselslib.events.vesseltype.common.VesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeCancelledEvent; import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.delete.CheckDeleteVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.delete.VesselTypeDeletedEvent; import es.redmic.vesselslib.events.vesseltype.update.UpdateVesselTypeEvent; Loading Loading @@ -64,7 +64,7 @@ public class VesselTypeAggregate extends Aggregate { return evt; } public DeleteVesselTypeEvent process(DeleteVesselTypeCommand cmd) { public CheckDeleteVesselTypeEvent process(DeleteVesselTypeCommand cmd) { assert vesselTypeStateStore != null; Loading @@ -76,9 +76,10 @@ public class VesselTypeAggregate extends Aggregate { checkState(id, state.getType()); DeleteVesselTypeEvent evt = new DeleteVesselTypeEvent(); CheckDeleteVesselTypeEvent evt = new CheckDeleteVesselTypeEvent(); evt.setAggregateId(id); evt.setVersion(getVersion() + 1); return evt; } Loading vessels-commands/src/main/java/es/redmic/vesselscommands/handler/VesselTypeCommandHandler.java +28 −2 Original line number Diff line number Diff line Loading @@ -27,7 +27,10 @@ import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeCancelledEv import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeFailedEvent; import es.redmic.vesselslib.events.vesseltype.create.VesselTypeCreatedEvent; import es.redmic.vesselslib.events.vesseltype.delete.CheckDeleteVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCancelledEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCheckFailedEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCheckedEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeConfirmedEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.delete.VesselTypeDeletedEvent; Loading @@ -48,6 +51,9 @@ public class VesselTypeCommandHandler extends CommandHandler { @Value("${broker.topic.vessel-type}") private String vesselTypeTopic; @Value("${broker.topic.vessels.agg.by.vesseltype}") private String vesselsAggByVesselTypeTopic; @Value("${broker.state.store.vesseltypes.dir}") private String stateStoreVesseltypesDir; Loading Loading @@ -92,7 +98,7 @@ public class VesselTypeCommandHandler extends CommandHandler { config .serviceId(vesseltypesEventsStreamId) .windowsTime(streamWindowsTime) .build(), alertService); .build(), vesselsAggByVesselTypeTopic, alertService); // @formatter:on } Loading Loading @@ -161,7 +167,7 @@ public class VesselTypeCommandHandler extends CommandHandler { agg.setAggregateId(id); // Se procesa el comando, obteniendo el evento generado DeleteVesselTypeEvent event = agg.process(cmd); CheckDeleteVesselTypeEvent event = agg.process(cmd); // Si no se genera evento significa que no se va a aplicar if (event == null) Loading Loading @@ -210,6 +216,14 @@ public class VesselTypeCommandHandler extends CommandHandler { resolveCommand(event.getSessionId()); } @KafkaHandler private void listen(DeleteVesselTypeCheckedEvent event) { logger.info("Enviando evento DeleteVesselTypeEvent para: " + event.getAggregateId()); publishToKafka(new DeleteVesselTypeEvent().buildFrom(event), vesselTypeTopic); } @KafkaHandler private void listen(DeleteVesselTypeConfirmedEvent event) { Loading Loading @@ -258,6 +272,18 @@ public class VesselTypeCommandHandler extends CommandHandler { ExceptionFactory.getException(event.getExceptionType(), event.getArguments())); } @KafkaHandler private void listen(DeleteVesselTypeCheckFailedEvent event) { logger.info("Enviando evento DeleteVesselTypeCheckFailedEvent para: " + event.getAggregateId()); DeleteVesselTypeCancelledEvent evt = new DeleteVesselTypeCancelledEvent().buildFrom(event); evt.setExceptionType(event.getExceptionType()); evt.setArguments(event.getArguments()); publishToKafka(evt, vesselTypeTopic); } @KafkaHandler private void listen(DeleteVesselTypeCancelledEvent event) { Loading vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselTypeEventStreams.java +55 −4 Original line number Diff line number Diff line package es.redmic.vesselscommands.streams; import java.util.HashMap; import java.util.Map; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import es.redmic.brokerlib.alert.AlertService; import es.redmic.brokerlib.avro.common.Event; import es.redmic.brokerlib.avro.common.EventError; import es.redmic.brokerlib.avro.common.EventTypes; import es.redmic.brokerlib.avro.serde.hashmap.HashMapSerde; import es.redmic.commandslib.exceptions.ExceptionType; import es.redmic.commandslib.streaming.common.StreamConfig; import es.redmic.commandslib.streaming.streams.EventSourcingStreams; import es.redmic.vesselslib.dto.vesseltype.VesselTypeDTO; import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.AggregationVesselTypeInVesselPostUpdateEvent; import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes; import es.redmic.vesselslib.events.vesseltype.common.VesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.create.VesselTypeCreatedEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCancelledEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCheckFailedEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCheckedEvent; import es.redmic.vesselslib.events.vesseltype.update.UpdateVesselTypeCancelledEvent; import es.redmic.vesselslib.events.vesseltype.update.VesselTypeUpdatedEvent; public class VesselTypeEventStreams extends EventSourcingStreams { public VesselTypeEventStreams(StreamConfig config, AlertService alertService) { private String vesselsAggByVesselTypeTopic; private HashMapSerde<String, AggregationVesselTypeInVesselPostUpdateEvent> hashMapSerde; private KTable<String, HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent>> aggByVesselType; public VesselTypeEventStreams(StreamConfig config, String vesselsAggByVesselTypeTopic, AlertService alertService) { super(config, alertService); this.vesselsAggByVesselTypeTopic = vesselsAggByVesselTypeTopic; this.hashMapSerde = new HashMapSerde<>(schemaRegistry); logger.info("Arrancado servicio de streaming para event sourcing de VesselType con Id: " + this.serviceId); init(); } /* * (non-Javadoc) * Crea KTable de vessels agregados por vesseltype * * @see es.redmic.commandslib.streaming.streams.EventSourcingStreams# * createExtraStreams() Loading @@ -32,8 +52,7 @@ public class VesselTypeEventStreams extends EventSourcingStreams { @Override protected void createExtraStreams() { // No existen streams extra necesarios aggByVesselType = builder.table(vesselsAggByVesselTypeTopic, Consumed.with(Serdes.String(), hashMapSerde)); } /* Loading Loading @@ -88,6 +107,38 @@ public class VesselTypeEventStreams extends EventSourcingStreams { return successfulEvent; } /* * Comprueba si vesselType está referenciado en vessel para cancelar el borrado */ @Override protected void processDeleteStream(KStream<String, Event> events) { // Stream filtrado por eventos de borrado KStream<String, Event> deleteEvents = events .filter((id, event) -> (EventTypes.CHECK_DELETE.equals(event.getType()))); deleteEvents.leftJoin(aggByVesselType, (deleteEvent, vesselAggByVesselType) -> getDeleteResultEvent(deleteEvent, vesselAggByVesselType)); } private Event getDeleteResultEvent(Event deleteEvent, HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent> vesselAggByVesselType) { if (vesselAggByVesselType == null || vesselAggByVesselType.isEmpty()) { // elemento no referenciado return new DeleteVesselTypeCheckedEvent().buildFrom(deleteEvent); } else { // elemento referenciado DeleteVesselTypeCheckFailedEvent evt = new DeleteVesselTypeCheckFailedEvent().buildFrom(deleteEvent); evt.setExceptionType(ExceptionType.ITEM_REFERENCED.toString()); Map<String, String> arguments = new HashMap<>(); arguments.put("id", deleteEvent.getAggregateId()); evt.setArguments(arguments); return evt; } } /* * Función que a partir del último evento correcto + el evento de edición * parcial + la confirmación de la vista, si todo es correcto, genera evento Loading vessels-commands/src/test/java/es/redmic/test/vesselscommands/integration/vesseltype/VesselTypeCommandHandlerTest.java +69 −0 Original line number Diff line number Diff line Loading @@ -37,17 +37,22 @@ import es.redmic.brokerlib.listener.SendListener; import es.redmic.exception.data.DeleteItemException; import es.redmic.exception.data.ItemAlreadyExistException; import es.redmic.exception.data.ItemNotFoundException; import es.redmic.test.vesselscommands.integration.vessel.VesselDataUtil; import es.redmic.testutils.kafka.KafkaBaseIntegrationTest; import es.redmic.vesselscommands.VesselsCommandsApplication; import es.redmic.vesselscommands.handler.VesselTypeCommandHandler; import es.redmic.vesselslib.dto.vesseltype.VesselTypeDTO; import es.redmic.vesselslib.events.vessel.create.VesselCreatedEvent; import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes; import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeCancelledEvent; import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeConfirmedEvent; import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeFailedEvent; import es.redmic.vesselslib.events.vesseltype.create.VesselTypeCreatedEvent; import es.redmic.vesselslib.events.vesseltype.delete.CheckDeleteVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCancelledEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCheckFailedEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCheckedEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeConfirmedEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeFailedEvent; import es.redmic.vesselslib.events.vesseltype.delete.VesselTypeDeletedEvent; Loading Loading @@ -81,6 +86,9 @@ public class VesselTypeCommandHandlerTest extends KafkaBaseIntegrationTest { @Value("${broker.topic.vessel-type}") private String vessel_type_topic; @Value("${broker.topic.vessel}") private String vessel_topic; @Autowired private KafkaTemplate<String, Event> kafkaTemplate; Loading Loading @@ -148,6 +156,28 @@ public class VesselTypeCommandHandlerTest extends KafkaBaseIntegrationTest { assertEquals(updateVesselTypeEvent.getVesselType(), ((VesselTypeUpdatedEvent) confirm).getVesselType()); } // Envía un evento de comprobación de que el elemento puede ser borrado y debe // provocar un evento DeleteVesselTypeCheckedEvent ya que no está referenciado @Test public void checkDeleteVesselTypeEvent_SendDeleteVesselTypeCheckedEvent_IfReceivesSuccess() throws InterruptedException { logger.debug("----> CheckDeleteVesselTypeEvent"); CheckDeleteVesselTypeEvent event = VesselTypeDataUtil.getCheckDeleteVesselTypeEvent(code + "3a"); kafkaTemplate.send(vessel_type_topic, event.getAggregateId(), event); Event confirm = (Event) blockingQueue.poll(60, TimeUnit.SECONDS); assertNotNull(confirm); assertEquals(VesselTypeEventTypes.DELETE_CHECKED, confirm.getType()); assertEquals(event.getAggregateId(), confirm.getAggregateId()); assertEquals(event.getUserId(), confirm.getUserId()); assertEquals(event.getSessionId(), confirm.getSessionId()); assertEquals(event.getVersion(), confirm.getVersion()); } // Envía un evento de confirmación de borrado y debe provocar un evento Deleted @Test public void deleteVesselTypeConfirmedEvent_SendVesselTypeDeletedEvent_IfReceivesSuccess() Loading Loading @@ -235,6 +265,33 @@ public class VesselTypeCommandHandlerTest extends KafkaBaseIntegrationTest { assertEquals(vesselTypeUpdateEvent.getVesselType(), ((UpdateVesselTypeCancelledEvent) confirm).getVesselType()); } // Envía un evento de comprobación de que el elemento puede ser borrado y debe // provocar un evento DeleteVesselTypeCheckFailedEvent ya que está referenciado @Test public void checkDeleteVesselTypeEvent_SendDeleteVesselTypeCheckFailedEvent_IfReceivesSuccess() throws InterruptedException { logger.debug("----> DeleteVesselTypeCheckFailedEvent"); CheckDeleteVesselTypeEvent event = VesselTypeDataUtil.getCheckDeleteVesselTypeEvent(code + "5a"); VesselCreatedEvent vesselWithVesselTypeEvent = VesselDataUtil.getVesselCreatedEvent(1); vesselWithVesselTypeEvent.getVessel().setType(VesselTypeDataUtil.getVesselType(code + "5a")); kafkaTemplate.send(vessel_topic, vesselWithVesselTypeEvent.getAggregateId(), vesselWithVesselTypeEvent); kafkaTemplate.send(vessel_type_topic, event.getAggregateId(), event); Event confirm = (Event) blockingQueue.poll(60, TimeUnit.SECONDS); assertNotNull(confirm); assertEquals(VesselTypeEventTypes.DELETE_CHECK_FAILED, confirm.getType()); assertEquals(event.getAggregateId(), confirm.getAggregateId()); assertEquals(event.getUserId(), confirm.getUserId()); assertEquals(event.getSessionId(), confirm.getSessionId()); assertEquals(event.getVersion(), confirm.getVersion()); } // Envía un evento de error de borrado y debe provocar un evento Cancelled con // el item dentro @Test(expected = DeleteItemException.class) Loading Loading @@ -310,6 +367,18 @@ public class VesselTypeCommandHandlerTest extends KafkaBaseIntegrationTest { blockingQueue.offer(deleteVesselTypeCancelledEvent); } @KafkaHandler public void deleteVesselTypeCheckedEvent(DeleteVesselTypeCheckedEvent deleteVesselTypeCheckedEvent) { blockingQueue.offer(deleteVesselTypeCheckedEvent); } @KafkaHandler public void deleteVesselTypeCheckFailedEvent(DeleteVesselTypeCheckFailedEvent deleteVesselTypeCheckFailedEvent) { blockingQueue.offer(deleteVesselTypeCheckFailedEvent); } @KafkaHandler(isDefault = true) public void defaultEvent(Object def) { Loading vessels-commands/src/test/java/es/redmic/test/vesselscommands/integration/vesseltype/VesselTypeDataUtil.java +6 −0 Original line number Diff line number Diff line Loading @@ -13,6 +13,7 @@ import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeConfirmedEv import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeFailedEvent; import es.redmic.vesselslib.events.vesseltype.create.VesselTypeCreatedEvent; import es.redmic.vesselslib.events.vesseltype.delete.CheckDeleteVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeConfirmedEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeFailedEvent; Loading Loading @@ -123,6 +124,11 @@ public abstract class VesselTypeDataUtil { return event; } public static CheckDeleteVesselTypeEvent getCheckDeleteVesselTypeEvent(String code) { return new CheckDeleteVesselTypeEvent().buildFrom(getDeleteEvent(code)); } public static DeleteVesselTypeConfirmedEvent getDeleteVesselTypeConfirmedEvent(String code) { return new DeleteVesselTypeConfirmedEvent().buildFrom(getDeleteEvent(code)); Loading Loading
vessels-commands/src/main/java/es/redmic/vesselscommands/aggregate/VesselTypeAggregate.java +4 −3 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes; import es.redmic.vesselslib.events.vesseltype.common.VesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeCancelledEvent; import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.delete.CheckDeleteVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.delete.VesselTypeDeletedEvent; import es.redmic.vesselslib.events.vesseltype.update.UpdateVesselTypeEvent; Loading Loading @@ -64,7 +64,7 @@ public class VesselTypeAggregate extends Aggregate { return evt; } public DeleteVesselTypeEvent process(DeleteVesselTypeCommand cmd) { public CheckDeleteVesselTypeEvent process(DeleteVesselTypeCommand cmd) { assert vesselTypeStateStore != null; Loading @@ -76,9 +76,10 @@ public class VesselTypeAggregate extends Aggregate { checkState(id, state.getType()); DeleteVesselTypeEvent evt = new DeleteVesselTypeEvent(); CheckDeleteVesselTypeEvent evt = new CheckDeleteVesselTypeEvent(); evt.setAggregateId(id); evt.setVersion(getVersion() + 1); return evt; } Loading
vessels-commands/src/main/java/es/redmic/vesselscommands/handler/VesselTypeCommandHandler.java +28 −2 Original line number Diff line number Diff line Loading @@ -27,7 +27,10 @@ import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeCancelledEv import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeFailedEvent; import es.redmic.vesselslib.events.vesseltype.create.VesselTypeCreatedEvent; import es.redmic.vesselslib.events.vesseltype.delete.CheckDeleteVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCancelledEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCheckFailedEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCheckedEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeConfirmedEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.delete.VesselTypeDeletedEvent; Loading @@ -48,6 +51,9 @@ public class VesselTypeCommandHandler extends CommandHandler { @Value("${broker.topic.vessel-type}") private String vesselTypeTopic; @Value("${broker.topic.vessels.agg.by.vesseltype}") private String vesselsAggByVesselTypeTopic; @Value("${broker.state.store.vesseltypes.dir}") private String stateStoreVesseltypesDir; Loading Loading @@ -92,7 +98,7 @@ public class VesselTypeCommandHandler extends CommandHandler { config .serviceId(vesseltypesEventsStreamId) .windowsTime(streamWindowsTime) .build(), alertService); .build(), vesselsAggByVesselTypeTopic, alertService); // @formatter:on } Loading Loading @@ -161,7 +167,7 @@ public class VesselTypeCommandHandler extends CommandHandler { agg.setAggregateId(id); // Se procesa el comando, obteniendo el evento generado DeleteVesselTypeEvent event = agg.process(cmd); CheckDeleteVesselTypeEvent event = agg.process(cmd); // Si no se genera evento significa que no se va a aplicar if (event == null) Loading Loading @@ -210,6 +216,14 @@ public class VesselTypeCommandHandler extends CommandHandler { resolveCommand(event.getSessionId()); } @KafkaHandler private void listen(DeleteVesselTypeCheckedEvent event) { logger.info("Enviando evento DeleteVesselTypeEvent para: " + event.getAggregateId()); publishToKafka(new DeleteVesselTypeEvent().buildFrom(event), vesselTypeTopic); } @KafkaHandler private void listen(DeleteVesselTypeConfirmedEvent event) { Loading Loading @@ -258,6 +272,18 @@ public class VesselTypeCommandHandler extends CommandHandler { ExceptionFactory.getException(event.getExceptionType(), event.getArguments())); } @KafkaHandler private void listen(DeleteVesselTypeCheckFailedEvent event) { logger.info("Enviando evento DeleteVesselTypeCheckFailedEvent para: " + event.getAggregateId()); DeleteVesselTypeCancelledEvent evt = new DeleteVesselTypeCancelledEvent().buildFrom(event); evt.setExceptionType(event.getExceptionType()); evt.setArguments(event.getArguments()); publishToKafka(evt, vesselTypeTopic); } @KafkaHandler private void listen(DeleteVesselTypeCancelledEvent event) { Loading
vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselTypeEventStreams.java +55 −4 Original line number Diff line number Diff line package es.redmic.vesselscommands.streams; import java.util.HashMap; import java.util.Map; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import es.redmic.brokerlib.alert.AlertService; import es.redmic.brokerlib.avro.common.Event; import es.redmic.brokerlib.avro.common.EventError; import es.redmic.brokerlib.avro.common.EventTypes; import es.redmic.brokerlib.avro.serde.hashmap.HashMapSerde; import es.redmic.commandslib.exceptions.ExceptionType; import es.redmic.commandslib.streaming.common.StreamConfig; import es.redmic.commandslib.streaming.streams.EventSourcingStreams; import es.redmic.vesselslib.dto.vesseltype.VesselTypeDTO; import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.AggregationVesselTypeInVesselPostUpdateEvent; import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes; import es.redmic.vesselslib.events.vesseltype.common.VesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.create.VesselTypeCreatedEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCancelledEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCheckFailedEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCheckedEvent; import es.redmic.vesselslib.events.vesseltype.update.UpdateVesselTypeCancelledEvent; import es.redmic.vesselslib.events.vesseltype.update.VesselTypeUpdatedEvent; public class VesselTypeEventStreams extends EventSourcingStreams { public VesselTypeEventStreams(StreamConfig config, AlertService alertService) { private String vesselsAggByVesselTypeTopic; private HashMapSerde<String, AggregationVesselTypeInVesselPostUpdateEvent> hashMapSerde; private KTable<String, HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent>> aggByVesselType; public VesselTypeEventStreams(StreamConfig config, String vesselsAggByVesselTypeTopic, AlertService alertService) { super(config, alertService); this.vesselsAggByVesselTypeTopic = vesselsAggByVesselTypeTopic; this.hashMapSerde = new HashMapSerde<>(schemaRegistry); logger.info("Arrancado servicio de streaming para event sourcing de VesselType con Id: " + this.serviceId); init(); } /* * (non-Javadoc) * Crea KTable de vessels agregados por vesseltype * * @see es.redmic.commandslib.streaming.streams.EventSourcingStreams# * createExtraStreams() Loading @@ -32,8 +52,7 @@ public class VesselTypeEventStreams extends EventSourcingStreams { @Override protected void createExtraStreams() { // No existen streams extra necesarios aggByVesselType = builder.table(vesselsAggByVesselTypeTopic, Consumed.with(Serdes.String(), hashMapSerde)); } /* Loading Loading @@ -88,6 +107,38 @@ public class VesselTypeEventStreams extends EventSourcingStreams { return successfulEvent; } /* * Comprueba si vesselType está referenciado en vessel para cancelar el borrado */ @Override protected void processDeleteStream(KStream<String, Event> events) { // Stream filtrado por eventos de borrado KStream<String, Event> deleteEvents = events .filter((id, event) -> (EventTypes.CHECK_DELETE.equals(event.getType()))); deleteEvents.leftJoin(aggByVesselType, (deleteEvent, vesselAggByVesselType) -> getDeleteResultEvent(deleteEvent, vesselAggByVesselType)); } private Event getDeleteResultEvent(Event deleteEvent, HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent> vesselAggByVesselType) { if (vesselAggByVesselType == null || vesselAggByVesselType.isEmpty()) { // elemento no referenciado return new DeleteVesselTypeCheckedEvent().buildFrom(deleteEvent); } else { // elemento referenciado DeleteVesselTypeCheckFailedEvent evt = new DeleteVesselTypeCheckFailedEvent().buildFrom(deleteEvent); evt.setExceptionType(ExceptionType.ITEM_REFERENCED.toString()); Map<String, String> arguments = new HashMap<>(); arguments.put("id", deleteEvent.getAggregateId()); evt.setArguments(arguments); return evt; } } /* * Función que a partir del último evento correcto + el evento de edición * parcial + la confirmación de la vista, si todo es correcto, genera evento Loading
vessels-commands/src/test/java/es/redmic/test/vesselscommands/integration/vesseltype/VesselTypeCommandHandlerTest.java +69 −0 Original line number Diff line number Diff line Loading @@ -37,17 +37,22 @@ import es.redmic.brokerlib.listener.SendListener; import es.redmic.exception.data.DeleteItemException; import es.redmic.exception.data.ItemAlreadyExistException; import es.redmic.exception.data.ItemNotFoundException; import es.redmic.test.vesselscommands.integration.vessel.VesselDataUtil; import es.redmic.testutils.kafka.KafkaBaseIntegrationTest; import es.redmic.vesselscommands.VesselsCommandsApplication; import es.redmic.vesselscommands.handler.VesselTypeCommandHandler; import es.redmic.vesselslib.dto.vesseltype.VesselTypeDTO; import es.redmic.vesselslib.events.vessel.create.VesselCreatedEvent; import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes; import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeCancelledEvent; import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeConfirmedEvent; import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeFailedEvent; import es.redmic.vesselslib.events.vesseltype.create.VesselTypeCreatedEvent; import es.redmic.vesselslib.events.vesseltype.delete.CheckDeleteVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCancelledEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCheckFailedEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCheckedEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeConfirmedEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeFailedEvent; import es.redmic.vesselslib.events.vesseltype.delete.VesselTypeDeletedEvent; Loading Loading @@ -81,6 +86,9 @@ public class VesselTypeCommandHandlerTest extends KafkaBaseIntegrationTest { @Value("${broker.topic.vessel-type}") private String vessel_type_topic; @Value("${broker.topic.vessel}") private String vessel_topic; @Autowired private KafkaTemplate<String, Event> kafkaTemplate; Loading Loading @@ -148,6 +156,28 @@ public class VesselTypeCommandHandlerTest extends KafkaBaseIntegrationTest { assertEquals(updateVesselTypeEvent.getVesselType(), ((VesselTypeUpdatedEvent) confirm).getVesselType()); } // Envía un evento de comprobación de que el elemento puede ser borrado y debe // provocar un evento DeleteVesselTypeCheckedEvent ya que no está referenciado @Test public void checkDeleteVesselTypeEvent_SendDeleteVesselTypeCheckedEvent_IfReceivesSuccess() throws InterruptedException { logger.debug("----> CheckDeleteVesselTypeEvent"); CheckDeleteVesselTypeEvent event = VesselTypeDataUtil.getCheckDeleteVesselTypeEvent(code + "3a"); kafkaTemplate.send(vessel_type_topic, event.getAggregateId(), event); Event confirm = (Event) blockingQueue.poll(60, TimeUnit.SECONDS); assertNotNull(confirm); assertEquals(VesselTypeEventTypes.DELETE_CHECKED, confirm.getType()); assertEquals(event.getAggregateId(), confirm.getAggregateId()); assertEquals(event.getUserId(), confirm.getUserId()); assertEquals(event.getSessionId(), confirm.getSessionId()); assertEquals(event.getVersion(), confirm.getVersion()); } // Envía un evento de confirmación de borrado y debe provocar un evento Deleted @Test public void deleteVesselTypeConfirmedEvent_SendVesselTypeDeletedEvent_IfReceivesSuccess() Loading Loading @@ -235,6 +265,33 @@ public class VesselTypeCommandHandlerTest extends KafkaBaseIntegrationTest { assertEquals(vesselTypeUpdateEvent.getVesselType(), ((UpdateVesselTypeCancelledEvent) confirm).getVesselType()); } // Envía un evento de comprobación de que el elemento puede ser borrado y debe // provocar un evento DeleteVesselTypeCheckFailedEvent ya que está referenciado @Test public void checkDeleteVesselTypeEvent_SendDeleteVesselTypeCheckFailedEvent_IfReceivesSuccess() throws InterruptedException { logger.debug("----> DeleteVesselTypeCheckFailedEvent"); CheckDeleteVesselTypeEvent event = VesselTypeDataUtil.getCheckDeleteVesselTypeEvent(code + "5a"); VesselCreatedEvent vesselWithVesselTypeEvent = VesselDataUtil.getVesselCreatedEvent(1); vesselWithVesselTypeEvent.getVessel().setType(VesselTypeDataUtil.getVesselType(code + "5a")); kafkaTemplate.send(vessel_topic, vesselWithVesselTypeEvent.getAggregateId(), vesselWithVesselTypeEvent); kafkaTemplate.send(vessel_type_topic, event.getAggregateId(), event); Event confirm = (Event) blockingQueue.poll(60, TimeUnit.SECONDS); assertNotNull(confirm); assertEquals(VesselTypeEventTypes.DELETE_CHECK_FAILED, confirm.getType()); assertEquals(event.getAggregateId(), confirm.getAggregateId()); assertEquals(event.getUserId(), confirm.getUserId()); assertEquals(event.getSessionId(), confirm.getSessionId()); assertEquals(event.getVersion(), confirm.getVersion()); } // Envía un evento de error de borrado y debe provocar un evento Cancelled con // el item dentro @Test(expected = DeleteItemException.class) Loading Loading @@ -310,6 +367,18 @@ public class VesselTypeCommandHandlerTest extends KafkaBaseIntegrationTest { blockingQueue.offer(deleteVesselTypeCancelledEvent); } @KafkaHandler public void deleteVesselTypeCheckedEvent(DeleteVesselTypeCheckedEvent deleteVesselTypeCheckedEvent) { blockingQueue.offer(deleteVesselTypeCheckedEvent); } @KafkaHandler public void deleteVesselTypeCheckFailedEvent(DeleteVesselTypeCheckFailedEvent deleteVesselTypeCheckFailedEvent) { blockingQueue.offer(deleteVesselTypeCheckFailedEvent); } @KafkaHandler(isDefault = true) public void defaultEvent(Object def) { Loading
vessels-commands/src/test/java/es/redmic/test/vesselscommands/integration/vesseltype/VesselTypeDataUtil.java +6 −0 Original line number Diff line number Diff line Loading @@ -13,6 +13,7 @@ import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeConfirmedEv import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeFailedEvent; import es.redmic.vesselslib.events.vesseltype.create.VesselTypeCreatedEvent; import es.redmic.vesselslib.events.vesseltype.delete.CheckDeleteVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeConfirmedEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeFailedEvent; Loading Loading @@ -123,6 +124,11 @@ public abstract class VesselTypeDataUtil { return event; } public static CheckDeleteVesselTypeEvent getCheckDeleteVesselTypeEvent(String code) { return new CheckDeleteVesselTypeEvent().buildFrom(getDeleteEvent(code)); } public static DeleteVesselTypeConfirmedEvent getDeleteVesselTypeConfirmedEvent(String code) { return new DeleteVesselTypeConfirmedEvent().buildFrom(getDeleteEvent(code)); Loading