Loading vessels-commands/src/main/java/es/redmic/vesselscommands/aggregate/VesselAggregate.java +12 −0 Original line number Diff line number Diff line Loading @@ -14,6 +14,7 @@ import es.redmic.vesselslib.events.vessel.create.CreateVesselCancelledEvent; import es.redmic.vesselslib.events.vessel.create.CreateVesselEvent; import es.redmic.vesselslib.events.vessel.delete.DeleteVesselEvent; import es.redmic.vesselslib.events.vessel.delete.VesselDeletedEvent; import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.UpdateVesselTypeInVesselEvent; import es.redmic.vesselslib.events.vessel.update.UpdateVesselEvent; public class VesselAggregate extends Aggregate { Loading Loading @@ -149,12 +150,23 @@ public class VesselAggregate extends Aggregate { logger.debug("Compensación por edición/borrado fallido"); apply((VesselEvent) history); break; case "UPDATE_VESSELTYPE": logger.debug("En fase de edición parcial de veseltype en vessel"); apply(history); break; default: super._loadFromHistory(history); break; } } public void apply(UpdateVesselTypeInVesselEvent event) { if (this.vessel == null) this.vessel = new VesselDTO(); this.vessel.setType(event.getVesselType()); super.apply(event); } public void apply(CreateVesselCancelledEvent event) { this.deleted = true; apply(event); Loading vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselEventStreams.java +73 −13 Original line number Diff line number Diff line Loading @@ -22,14 +22,15 @@ import es.redmic.vesselslib.events.vessel.VesselEventTypes; import es.redmic.vesselslib.events.vessel.common.VesselEvent; import es.redmic.vesselslib.events.vessel.create.VesselCreatedEvent; import es.redmic.vesselslib.events.vessel.delete.DeleteVesselCancelledEvent; import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.UpdateVesselTypeInVesselEvent; import es.redmic.vesselslib.events.vessel.update.UpdateVesselCancelledEvent; import es.redmic.vesselslib.events.vessel.update.UpdateVesselEvent; import es.redmic.vesselslib.events.vessel.update.VesselUpdatedEvent; import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes; import es.redmic.vesselslib.events.vesseltype.common.VesselTypeEvent; public class VesselEventStreams extends EventStreams { private static final String REDMIC_PROCESS = "REDMIC_PROCESS"; private String vesselTypeTopic; public VesselEventStreams(StreamConfig config, String vesselTypeTopic, AlertService alertService) { Loading Loading @@ -91,6 +92,8 @@ public class VesselEventStreams extends EventStreams { updateConfirmedEvents.join(updateRequestEvents, (confirmedEvent, requestEvent) -> getUpdatedEvent(confirmedEvent, requestEvent), JoinWindows.of(windowsTime)).to(topic); processPartialUpdatedStream(vesselEvents, updateConfirmedEvents); } private Event getUpdatedEvent(Event confirmedEvent, Event requestEvent) { Loading @@ -112,6 +115,61 @@ public class VesselEventStreams extends EventStreams { return successfulEvent; } private void processPartialUpdatedStream(KStream<String, Event> vesselEvents, KStream<String, Event> updateConfirmedEvents) { // Stream filtrado por eventos de petición de modificar vesseltype KStream<String, Event> updateRequestEvents = vesselEvents .filter((id, event) -> (VesselEventTypes.UPDATE_VESSELTYPE.equals(event.getType()))); // Join por id, mandando a kafka el evento de éxito KStream<String, UpdateVesselTypeInVesselEvent> partialUpdateEvent = updateConfirmedEvents.join( updateRequestEvents, (confirmedEvent, requestEvent) -> checkConfirmPartialUpdate(confirmedEvent, requestEvent), JoinWindows.of(windowsTime)); // Stream filtrado por eventos de creaciones y modificaciones correctos (solo el // último que se produzca por id) KStream<String, Event> successEvents = vesselEvents .filter((id, event) -> (VesselEventTypes.CREATED.equals(event.getType()) || VesselEventTypes.UPDATED.equals(event.getType()))); KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue); // Join por id, mandando a kafka el evento de compensación partialUpdateEvent.join(successEventsTable, (partialUpdateConfirmEvent, lastSuccessEvent) -> getUpdatedEventFromPartialUpdate(partialUpdateConfirmEvent, lastSuccessEvent)) .to(topic); } // Comprueba si la confirmación corresponde con el evento enviado. private UpdateVesselTypeInVesselEvent checkConfirmPartialUpdate(Event confirmedEvent, Event requestEvent) { if (!isSameSession(confirmedEvent, requestEvent)) { return null; } return (UpdateVesselTypeInVesselEvent) requestEvent; } private Event getUpdatedEventFromPartialUpdate(UpdateVesselTypeInVesselEvent partialUpdateConfirmEvent, Event lastSuccessEvent) { assert (lastSuccessEvent.getType().equals(VesselEventTypes.CREATED) || lastSuccessEvent.getType().equals(VesselEventTypes.UPDATED)); assert partialUpdateConfirmEvent.getType().equals(VesselEventTypes.UPDATE_VESSELTYPE); VesselDTO vessel = ((VesselEvent) lastSuccessEvent).getVessel(); vessel.setType(partialUpdateConfirmEvent.getVesselType()); logger.info("Creando evento VesselUpdatedEvent por una edición parcial de vesselType para: " + partialUpdateConfirmEvent.getAggregateId()); VesselUpdatedEvent successfulEvent = new VesselUpdatedEvent().buildFrom(partialUpdateConfirmEvent); successfulEvent.setVessel(vessel); return successfulEvent; } @Override protected void processFailedChangeStream(KStream<String, Event> vesselEvents) { Loading Loading @@ -171,6 +229,10 @@ public class VesselEventStreams extends EventStreams { logger.info("Enviando evento UpdateVesselCancelledEvent para: " + failedEvent.getAggregateId()); if (failedEvent.getUserId().equals(REDMIC_PROCESS)) alertService.errorAlert("UpdateVesselCancelledEvent para: " + failedEvent.getAggregateId(), eventError.getExceptionType() + " " + eventError.getArguments()); UpdateVesselCancelledEvent cancelledEvent = new UpdateVesselCancelledEvent().buildFrom(failedEvent); cancelledEvent.setVessel(vessel); cancelledEvent.setExceptionType(eventError.getExceptionType()); Loading Loading @@ -224,18 +286,18 @@ public class VesselEventStreams extends EventStreams { KStream<String, Event> updateReferenceEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventTypes.UPDATED.equals(event.getType()))); KStream<String, ArrayList<VesselEvent>> join = updateReferenceEvents.join(vesselEventsTable, KStream<String, ArrayList<UpdateVesselTypeInVesselEvent>> join = updateReferenceEvents.join(vesselEventsTable, (updateReferenceEvent, vesselWithReferenceEvents) -> getPostUpdateEvent(updateReferenceEvent, vesselWithReferenceEvents)); // desagregar, cambiar clave por la de vessel y enviar a topic join.flatMapValues(value -> value).selectKey((k, v) -> v.getVessel().getId()).to(topic); join.flatMapValues(value -> value).selectKey((k, v) -> v.getAggregateId()).to(topic); } private ArrayList<VesselEvent> getPostUpdateEvent(Event updateReferenceEvent, private ArrayList<UpdateVesselTypeInVesselEvent> getPostUpdateEvent(Event updateReferenceEvent, HashMap<String, VesselEvent> vesselWithReferenceEvents) { ArrayList<VesselEvent> result = new ArrayList<>(); ArrayList<UpdateVesselTypeInVesselEvent> result = new ArrayList<>(); for (Map.Entry<String, VesselEvent> entry : vesselWithReferenceEvents.entrySet()) { Loading @@ -260,15 +322,13 @@ public class VesselEventStreams extends EventStreams { if (!vesselEvent.getVessel().getType().equals(vesselType)) { UpdateVesselEvent updateVesselEvent = new UpdateVesselEvent(); updateVesselEvent.setAggregateId(vesselEvent.getAggregateId()); updateVesselEvent.setUserId(updateReferenceEvent.getUserId()); updateVesselEvent.setVersion(vesselEvent.getVersion() + 1); UpdateVesselTypeInVesselEvent updateVesselType = new UpdateVesselTypeInVesselEvent(); updateVesselType.setAggregateId(vesselEvent.getAggregateId()); updateVesselType.setUserId(updateReferenceEvent.getUserId()); updateVesselType.setVersion(vesselEvent.getVersion() + 1); updateVesselType.setVesselType(((VesselTypeEvent) updateReferenceEvent).getVesselType()); result.add(updateVesselType); VesselDTO vessel = vesselEvent.getVessel(); vessel.setType(((VesselTypeEvent) updateReferenceEvent).getVesselType()); updateVesselEvent.setVessel(vessel); result.add(updateVesselEvent); } else { logger.debug("VesselType ya estaba actualizado o los campos indexados no han cambiado "); } Loading vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselTypeEventStreams.java +3 −3 Original line number Diff line number Diff line Loading @@ -106,7 +106,7 @@ public class VesselTypeEventStreams extends EventStreams { // último que se produzca por id) KStream<String, Event> successEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventTypes.CREATED.toString().equals(event.getType()) || VesselTypeEventTypes.VESSELTYPE_UPDATED.toString().equals(event.getType()))); || VesselTypeEventTypes.UPDATED.toString().equals(event.getType()))); processUpdateFailedStream(vesselTypeEvents, successEvents); Loading Loading @@ -151,7 +151,7 @@ public class VesselTypeEventStreams extends EventStreams { assert failedEvent.getType().equals(VesselTypeEventTypes.UPDATE_FAILED); assert lastSuccessEvent.getType().equals(VesselTypeEventTypes.CREATED) || lastSuccessEvent.getType().equals(VesselTypeEventTypes.VESSELTYPE_UPDATED); || lastSuccessEvent.getType().equals(VesselTypeEventTypes.UPDATED); VesselTypeDTO vesselType = ((VesselTypeEvent) lastSuccessEvent).getVesselType(); Loading @@ -171,7 +171,7 @@ public class VesselTypeEventStreams extends EventStreams { assert failedEvent.getType().equals(VesselTypeEventTypes.DELETE_FAILED); assert lastSuccessEvent.getType().equals(VesselTypeEventTypes.CREATED) || lastSuccessEvent.getType().equals(VesselTypeEventTypes.VESSELTYPE_UPDATED); || lastSuccessEvent.getType().equals(VesselTypeEventTypes.UPDATED); VesselTypeDTO vesselType = ((VesselTypeEvent) lastSuccessEvent).getVesselType(); Loading vessels-commands/src/test/java/es/redmic/test/vesselscommands/integration/vessel/VesselPostUpdateHandlerTest.java +73 −16 Original line number Diff line number Diff line Loading @@ -4,6 +4,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; Loading @@ -29,13 +31,18 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import es.redmic.brokerlib.alert.AlertType; import es.redmic.brokerlib.alert.Message; import es.redmic.brokerlib.avro.common.Event; import es.redmic.exception.common.ExceptionType; import es.redmic.test.vesselscommands.integration.vesseltype.VesselTypeDataUtil; import es.redmic.testutils.kafka.KafkaBaseIntegrationTest; import es.redmic.vesselscommands.VesselsCommandsApplication; import es.redmic.vesselslib.events.vessel.VesselEventTypes; import es.redmic.vesselslib.events.vessel.common.VesselCancelledEvent; import es.redmic.vesselslib.events.vessel.create.CreateVesselEvent; import es.redmic.vesselslib.events.vessel.create.VesselCreatedEvent; import es.redmic.vesselslib.events.vessel.update.UpdateVesselEvent; import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.UpdateVesselTypeInVesselEvent; import es.redmic.vesselslib.events.vessel.update.UpdateVesselConfirmedEvent; import es.redmic.vesselslib.events.vessel.update.UpdateVesselFailedEvent; import es.redmic.vesselslib.events.vessel.update.VesselUpdatedEvent; import es.redmic.vesselslib.events.vesseltype.update.VesselTypeUpdatedEvent; @RunWith(SpringJUnit4ClassRunner.class) Loading Loading @@ -70,6 +77,10 @@ public class VesselPostUpdateHandlerTest extends KafkaBaseIntegrationTest { protected BlockingQueue<Object> blockingQueueForAlerts; protected BlockingQueue<Object> blockingQueueForUpdatedEvents; protected BlockingQueue<Object> blockingQueueForCancelledEvents; @PostConstruct public void VesselPostUpdateHandlerTestPostConstruct() throws Exception { Loading @@ -82,6 +93,10 @@ public class VesselPostUpdateHandlerTest extends KafkaBaseIntegrationTest { blockingQueue = new LinkedBlockingDeque<>(); blockingQueueForAlerts = new LinkedBlockingDeque<>(); blockingQueueForUpdatedEvents = new LinkedBlockingDeque<>(); blockingQueueForCancelledEvents = new LinkedBlockingDeque<>(); } // PostUpdate Loading @@ -98,58 +113,100 @@ public class VesselPostUpdateHandlerTest extends KafkaBaseIntegrationTest { // Envía created para que genere un evento postUpdate y lo saca de la cola VesselCreatedEvent vesselCreatedEvent = VesselDataUtil.getVesselCreatedEvent(mmsi + 7); kafkaTemplate.send(VESSEL_TOPIC, vesselCreatedEvent.getAggregateId(), vesselCreatedEvent); Thread.sleep(2000); // Envía created para que genere un evento postUpdate y lo saca de la cola VesselCreatedEvent vesselCreatedEvent2 = VesselDataUtil.getVesselCreatedEvent(mmsi + 8); kafkaTemplate.send(VESSEL_TOPIC, vesselCreatedEvent2.getAggregateId(), vesselCreatedEvent2); Thread.sleep(2000); // Envía created con vesselType actualizado para comprobar que no genera evento VesselCreatedEvent vesselCreatedEvent3 = VesselDataUtil.getVesselCreatedEvent(mmsi + 9); vesselCreatedEvent3.getVessel().setType(vesselTypeUpdatedEvent.getVesselType()); kafkaTemplate.send(VESSEL_TOPIC, vesselCreatedEvent3.getAggregateId(), vesselCreatedEvent3); Thread.sleep(2000); // Envía create para simular uno a medias en el stream y lo saca de la cola CreateVesselEvent createVesselEvent = VesselDataUtil.getCreateEvent(mmsi + 10); kafkaTemplate.send(VESSEL_TOPIC, createVesselEvent.getAggregateId(), createVesselEvent); Thread.sleep(2000); // Envía evento de vesselType actualizado para que genere los eventos de // postUpdate kafkaTemplate.send(VESSELTYPE_TOPIC, vesselTypeUpdatedEvent.getAggregateId(), vesselTypeUpdatedEvent); Thread.sleep(2000); Event update = (Event) blockingQueue.poll(30, TimeUnit.SECONDS); UpdateVesselTypeInVesselEvent update = (UpdateVesselTypeInVesselEvent) blockingQueue.poll(30, TimeUnit.SECONDS); assertNotNull(update); assertEquals(VesselEventTypes.UPDATE, update.getType()); assertEquals(vesselTypeUpdatedEvent.getVesselType(), ((UpdateVesselEvent) update).getVessel().getType()); assertEquals(vesselCreatedEvent.getAggregateId(), ((UpdateVesselEvent) update).getAggregateId()); assertEquals(VesselEventTypes.UPDATE_VESSELTYPE, update.getType()); assertEquals(vesselTypeUpdatedEvent.getVesselType(), update.getVesselType()); assertEquals(vesselCreatedEvent.getAggregateId(), update.getAggregateId()); Event update2 = (Event) blockingQueue.poll(30, TimeUnit.SECONDS); assertNotNull(update2); assertEquals(VesselEventTypes.UPDATE, update2.getType()); assertEquals(vesselTypeUpdatedEvent.getVesselType(), ((UpdateVesselEvent) update2).getVessel().getType()); assertEquals(vesselCreatedEvent2.getAggregateId(), ((UpdateVesselEvent) update2).getAggregateId()); // Envía confirmación para simular que view lo insertó kafkaTemplate.send(VESSEL_TOPIC, update.getAggregateId(), new UpdateVesselConfirmedEvent().buildFrom(update)); Event update3 = (Event) blockingQueue.poll(20, TimeUnit.SECONDS); UpdateVesselTypeInVesselEvent update2 = (UpdateVesselTypeInVesselEvent) blockingQueue.poll(30, TimeUnit.SECONDS); assertNotNull(update2); assertEquals(VesselEventTypes.UPDATE_VESSELTYPE, update2.getType()); assertEquals(vesselTypeUpdatedEvent.getVesselType(), update2.getVesselType()); assertEquals(vesselCreatedEvent2.getAggregateId(), update2.getAggregateId()); // Envía fallo para simular que view no lo insertó UpdateVesselFailedEvent updateVesselFailedEvent = new UpdateVesselFailedEvent().buildFrom(update2); updateVesselFailedEvent.setExceptionType(ExceptionType.ITEM_NOT_FOUND.name()); Map<String, String> arguments = new HashMap<>(); arguments.put("A", "B"); updateVesselFailedEvent.setArguments(arguments); kafkaTemplate.send(VESSEL_TOPIC, update2.getAggregateId(), updateVesselFailedEvent); Event update3 = (Event) blockingQueue.poll(30, TimeUnit.SECONDS); assertNull(update3); // LLegó un mensaje de alerta Message message = (Message) blockingQueueForAlerts.poll(30, TimeUnit.SECONDS); assertNotNull(message); assertEquals(AlertType.ERROR.name(), message.getType()); // Se modificó bien el primer vessel VesselUpdatedEvent updated = (VesselUpdatedEvent) blockingQueueForUpdatedEvents.poll(30, TimeUnit.SECONDS); assertNotNull(updated); assertEquals(VesselEventTypes.UPDATED, updated.getType()); assertEquals(vesselTypeUpdatedEvent.getVesselType(), updated.getVessel().getType()); assertEquals(vesselCreatedEvent.getAggregateId(), updated.getAggregateId()); // No se modificó bien el segundo vessel VesselCancelledEvent cancelled = (VesselCancelledEvent) blockingQueueForCancelledEvents.poll(30, TimeUnit.SECONDS); assertNotNull(cancelled); assertEquals(VesselEventTypes.UPDATE_CANCELLED, cancelled.getType()); assertEquals(vesselCreatedEvent2.getVessel().getType(), cancelled.getVessel().getType()); assertEquals(vesselCreatedEvent2.getAggregateId(), cancelled.getAggregateId()); // LLegó un mensaje de alerta Message message2 = (Message) blockingQueueForAlerts.poll(30, TimeUnit.SECONDS); assertNotNull(message2); assertEquals(AlertType.ERROR.name(), message2.getType()); } @KafkaHandler public void updateVesselEventFromPostUpdate(UpdateVesselEvent updateVesselEvent) { public void updateVesselEventFromPostUpdate(UpdateVesselTypeInVesselEvent updateVesselEvent) { blockingQueue.offer(updateVesselEvent); } @KafkaHandler public void vesselUpdatedEventFromPostUpdate(VesselUpdatedEvent vesselUpdatedEvent) { blockingQueueForUpdatedEvents.offer(vesselUpdatedEvent); } @KafkaHandler public void vesselCancelledEventFromPostUpdate(VesselCancelledEvent vesselCancelledEvent) { blockingQueueForCancelledEvents.offer(vesselCancelledEvent); } @KafkaListener(topics = "${broker.topic.alert}", groupId = "test") public void errorAlert(Message message) { blockingQueueForAlerts.offer(message); Loading vessels-commands/src/test/java/es/redmic/test/vesselscommands/integration/vesseltype/VesselTypeDataUtil.java +1 −1 Original line number Diff line number Diff line Loading @@ -24,7 +24,7 @@ import es.redmic.vesselslib.events.vesseltype.update.VesselTypeUpdatedEvent; public abstract class VesselTypeDataUtil { public final static String PREFIX = "vesseltype-code-", USER = "1"; public final static String PREFIX = "vesseltype-code-", USER = "REDMIC_PROCESS"; public static CreateVesselTypeEvent getCreateEvent(String code) { Loading Loading
vessels-commands/src/main/java/es/redmic/vesselscommands/aggregate/VesselAggregate.java +12 −0 Original line number Diff line number Diff line Loading @@ -14,6 +14,7 @@ import es.redmic.vesselslib.events.vessel.create.CreateVesselCancelledEvent; import es.redmic.vesselslib.events.vessel.create.CreateVesselEvent; import es.redmic.vesselslib.events.vessel.delete.DeleteVesselEvent; import es.redmic.vesselslib.events.vessel.delete.VesselDeletedEvent; import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.UpdateVesselTypeInVesselEvent; import es.redmic.vesselslib.events.vessel.update.UpdateVesselEvent; public class VesselAggregate extends Aggregate { Loading Loading @@ -149,12 +150,23 @@ public class VesselAggregate extends Aggregate { logger.debug("Compensación por edición/borrado fallido"); apply((VesselEvent) history); break; case "UPDATE_VESSELTYPE": logger.debug("En fase de edición parcial de veseltype en vessel"); apply(history); break; default: super._loadFromHistory(history); break; } } public void apply(UpdateVesselTypeInVesselEvent event) { if (this.vessel == null) this.vessel = new VesselDTO(); this.vessel.setType(event.getVesselType()); super.apply(event); } public void apply(CreateVesselCancelledEvent event) { this.deleted = true; apply(event); Loading
vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselEventStreams.java +73 −13 Original line number Diff line number Diff line Loading @@ -22,14 +22,15 @@ import es.redmic.vesselslib.events.vessel.VesselEventTypes; import es.redmic.vesselslib.events.vessel.common.VesselEvent; import es.redmic.vesselslib.events.vessel.create.VesselCreatedEvent; import es.redmic.vesselslib.events.vessel.delete.DeleteVesselCancelledEvent; import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.UpdateVesselTypeInVesselEvent; import es.redmic.vesselslib.events.vessel.update.UpdateVesselCancelledEvent; import es.redmic.vesselslib.events.vessel.update.UpdateVesselEvent; import es.redmic.vesselslib.events.vessel.update.VesselUpdatedEvent; import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes; import es.redmic.vesselslib.events.vesseltype.common.VesselTypeEvent; public class VesselEventStreams extends EventStreams { private static final String REDMIC_PROCESS = "REDMIC_PROCESS"; private String vesselTypeTopic; public VesselEventStreams(StreamConfig config, String vesselTypeTopic, AlertService alertService) { Loading Loading @@ -91,6 +92,8 @@ public class VesselEventStreams extends EventStreams { updateConfirmedEvents.join(updateRequestEvents, (confirmedEvent, requestEvent) -> getUpdatedEvent(confirmedEvent, requestEvent), JoinWindows.of(windowsTime)).to(topic); processPartialUpdatedStream(vesselEvents, updateConfirmedEvents); } private Event getUpdatedEvent(Event confirmedEvent, Event requestEvent) { Loading @@ -112,6 +115,61 @@ public class VesselEventStreams extends EventStreams { return successfulEvent; } private void processPartialUpdatedStream(KStream<String, Event> vesselEvents, KStream<String, Event> updateConfirmedEvents) { // Stream filtrado por eventos de petición de modificar vesseltype KStream<String, Event> updateRequestEvents = vesselEvents .filter((id, event) -> (VesselEventTypes.UPDATE_VESSELTYPE.equals(event.getType()))); // Join por id, mandando a kafka el evento de éxito KStream<String, UpdateVesselTypeInVesselEvent> partialUpdateEvent = updateConfirmedEvents.join( updateRequestEvents, (confirmedEvent, requestEvent) -> checkConfirmPartialUpdate(confirmedEvent, requestEvent), JoinWindows.of(windowsTime)); // Stream filtrado por eventos de creaciones y modificaciones correctos (solo el // último que se produzca por id) KStream<String, Event> successEvents = vesselEvents .filter((id, event) -> (VesselEventTypes.CREATED.equals(event.getType()) || VesselEventTypes.UPDATED.equals(event.getType()))); KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue); // Join por id, mandando a kafka el evento de compensación partialUpdateEvent.join(successEventsTable, (partialUpdateConfirmEvent, lastSuccessEvent) -> getUpdatedEventFromPartialUpdate(partialUpdateConfirmEvent, lastSuccessEvent)) .to(topic); } // Comprueba si la confirmación corresponde con el evento enviado. private UpdateVesselTypeInVesselEvent checkConfirmPartialUpdate(Event confirmedEvent, Event requestEvent) { if (!isSameSession(confirmedEvent, requestEvent)) { return null; } return (UpdateVesselTypeInVesselEvent) requestEvent; } private Event getUpdatedEventFromPartialUpdate(UpdateVesselTypeInVesselEvent partialUpdateConfirmEvent, Event lastSuccessEvent) { assert (lastSuccessEvent.getType().equals(VesselEventTypes.CREATED) || lastSuccessEvent.getType().equals(VesselEventTypes.UPDATED)); assert partialUpdateConfirmEvent.getType().equals(VesselEventTypes.UPDATE_VESSELTYPE); VesselDTO vessel = ((VesselEvent) lastSuccessEvent).getVessel(); vessel.setType(partialUpdateConfirmEvent.getVesselType()); logger.info("Creando evento VesselUpdatedEvent por una edición parcial de vesselType para: " + partialUpdateConfirmEvent.getAggregateId()); VesselUpdatedEvent successfulEvent = new VesselUpdatedEvent().buildFrom(partialUpdateConfirmEvent); successfulEvent.setVessel(vessel); return successfulEvent; } @Override protected void processFailedChangeStream(KStream<String, Event> vesselEvents) { Loading Loading @@ -171,6 +229,10 @@ public class VesselEventStreams extends EventStreams { logger.info("Enviando evento UpdateVesselCancelledEvent para: " + failedEvent.getAggregateId()); if (failedEvent.getUserId().equals(REDMIC_PROCESS)) alertService.errorAlert("UpdateVesselCancelledEvent para: " + failedEvent.getAggregateId(), eventError.getExceptionType() + " " + eventError.getArguments()); UpdateVesselCancelledEvent cancelledEvent = new UpdateVesselCancelledEvent().buildFrom(failedEvent); cancelledEvent.setVessel(vessel); cancelledEvent.setExceptionType(eventError.getExceptionType()); Loading Loading @@ -224,18 +286,18 @@ public class VesselEventStreams extends EventStreams { KStream<String, Event> updateReferenceEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventTypes.UPDATED.equals(event.getType()))); KStream<String, ArrayList<VesselEvent>> join = updateReferenceEvents.join(vesselEventsTable, KStream<String, ArrayList<UpdateVesselTypeInVesselEvent>> join = updateReferenceEvents.join(vesselEventsTable, (updateReferenceEvent, vesselWithReferenceEvents) -> getPostUpdateEvent(updateReferenceEvent, vesselWithReferenceEvents)); // desagregar, cambiar clave por la de vessel y enviar a topic join.flatMapValues(value -> value).selectKey((k, v) -> v.getVessel().getId()).to(topic); join.flatMapValues(value -> value).selectKey((k, v) -> v.getAggregateId()).to(topic); } private ArrayList<VesselEvent> getPostUpdateEvent(Event updateReferenceEvent, private ArrayList<UpdateVesselTypeInVesselEvent> getPostUpdateEvent(Event updateReferenceEvent, HashMap<String, VesselEvent> vesselWithReferenceEvents) { ArrayList<VesselEvent> result = new ArrayList<>(); ArrayList<UpdateVesselTypeInVesselEvent> result = new ArrayList<>(); for (Map.Entry<String, VesselEvent> entry : vesselWithReferenceEvents.entrySet()) { Loading @@ -260,15 +322,13 @@ public class VesselEventStreams extends EventStreams { if (!vesselEvent.getVessel().getType().equals(vesselType)) { UpdateVesselEvent updateVesselEvent = new UpdateVesselEvent(); updateVesselEvent.setAggregateId(vesselEvent.getAggregateId()); updateVesselEvent.setUserId(updateReferenceEvent.getUserId()); updateVesselEvent.setVersion(vesselEvent.getVersion() + 1); UpdateVesselTypeInVesselEvent updateVesselType = new UpdateVesselTypeInVesselEvent(); updateVesselType.setAggregateId(vesselEvent.getAggregateId()); updateVesselType.setUserId(updateReferenceEvent.getUserId()); updateVesselType.setVersion(vesselEvent.getVersion() + 1); updateVesselType.setVesselType(((VesselTypeEvent) updateReferenceEvent).getVesselType()); result.add(updateVesselType); VesselDTO vessel = vesselEvent.getVessel(); vessel.setType(((VesselTypeEvent) updateReferenceEvent).getVesselType()); updateVesselEvent.setVessel(vessel); result.add(updateVesselEvent); } else { logger.debug("VesselType ya estaba actualizado o los campos indexados no han cambiado "); } Loading
vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselTypeEventStreams.java +3 −3 Original line number Diff line number Diff line Loading @@ -106,7 +106,7 @@ public class VesselTypeEventStreams extends EventStreams { // último que se produzca por id) KStream<String, Event> successEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventTypes.CREATED.toString().equals(event.getType()) || VesselTypeEventTypes.VESSELTYPE_UPDATED.toString().equals(event.getType()))); || VesselTypeEventTypes.UPDATED.toString().equals(event.getType()))); processUpdateFailedStream(vesselTypeEvents, successEvents); Loading Loading @@ -151,7 +151,7 @@ public class VesselTypeEventStreams extends EventStreams { assert failedEvent.getType().equals(VesselTypeEventTypes.UPDATE_FAILED); assert lastSuccessEvent.getType().equals(VesselTypeEventTypes.CREATED) || lastSuccessEvent.getType().equals(VesselTypeEventTypes.VESSELTYPE_UPDATED); || lastSuccessEvent.getType().equals(VesselTypeEventTypes.UPDATED); VesselTypeDTO vesselType = ((VesselTypeEvent) lastSuccessEvent).getVesselType(); Loading @@ -171,7 +171,7 @@ public class VesselTypeEventStreams extends EventStreams { assert failedEvent.getType().equals(VesselTypeEventTypes.DELETE_FAILED); assert lastSuccessEvent.getType().equals(VesselTypeEventTypes.CREATED) || lastSuccessEvent.getType().equals(VesselTypeEventTypes.VESSELTYPE_UPDATED); || lastSuccessEvent.getType().equals(VesselTypeEventTypes.UPDATED); VesselTypeDTO vesselType = ((VesselTypeEvent) lastSuccessEvent).getVesselType(); Loading
vessels-commands/src/test/java/es/redmic/test/vesselscommands/integration/vessel/VesselPostUpdateHandlerTest.java +73 −16 Original line number Diff line number Diff line Loading @@ -4,6 +4,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; Loading @@ -29,13 +31,18 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import es.redmic.brokerlib.alert.AlertType; import es.redmic.brokerlib.alert.Message; import es.redmic.brokerlib.avro.common.Event; import es.redmic.exception.common.ExceptionType; import es.redmic.test.vesselscommands.integration.vesseltype.VesselTypeDataUtil; import es.redmic.testutils.kafka.KafkaBaseIntegrationTest; import es.redmic.vesselscommands.VesselsCommandsApplication; import es.redmic.vesselslib.events.vessel.VesselEventTypes; import es.redmic.vesselslib.events.vessel.common.VesselCancelledEvent; import es.redmic.vesselslib.events.vessel.create.CreateVesselEvent; import es.redmic.vesselslib.events.vessel.create.VesselCreatedEvent; import es.redmic.vesselslib.events.vessel.update.UpdateVesselEvent; import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.UpdateVesselTypeInVesselEvent; import es.redmic.vesselslib.events.vessel.update.UpdateVesselConfirmedEvent; import es.redmic.vesselslib.events.vessel.update.UpdateVesselFailedEvent; import es.redmic.vesselslib.events.vessel.update.VesselUpdatedEvent; import es.redmic.vesselslib.events.vesseltype.update.VesselTypeUpdatedEvent; @RunWith(SpringJUnit4ClassRunner.class) Loading Loading @@ -70,6 +77,10 @@ public class VesselPostUpdateHandlerTest extends KafkaBaseIntegrationTest { protected BlockingQueue<Object> blockingQueueForAlerts; protected BlockingQueue<Object> blockingQueueForUpdatedEvents; protected BlockingQueue<Object> blockingQueueForCancelledEvents; @PostConstruct public void VesselPostUpdateHandlerTestPostConstruct() throws Exception { Loading @@ -82,6 +93,10 @@ public class VesselPostUpdateHandlerTest extends KafkaBaseIntegrationTest { blockingQueue = new LinkedBlockingDeque<>(); blockingQueueForAlerts = new LinkedBlockingDeque<>(); blockingQueueForUpdatedEvents = new LinkedBlockingDeque<>(); blockingQueueForCancelledEvents = new LinkedBlockingDeque<>(); } // PostUpdate Loading @@ -98,58 +113,100 @@ public class VesselPostUpdateHandlerTest extends KafkaBaseIntegrationTest { // Envía created para que genere un evento postUpdate y lo saca de la cola VesselCreatedEvent vesselCreatedEvent = VesselDataUtil.getVesselCreatedEvent(mmsi + 7); kafkaTemplate.send(VESSEL_TOPIC, vesselCreatedEvent.getAggregateId(), vesselCreatedEvent); Thread.sleep(2000); // Envía created para que genere un evento postUpdate y lo saca de la cola VesselCreatedEvent vesselCreatedEvent2 = VesselDataUtil.getVesselCreatedEvent(mmsi + 8); kafkaTemplate.send(VESSEL_TOPIC, vesselCreatedEvent2.getAggregateId(), vesselCreatedEvent2); Thread.sleep(2000); // Envía created con vesselType actualizado para comprobar que no genera evento VesselCreatedEvent vesselCreatedEvent3 = VesselDataUtil.getVesselCreatedEvent(mmsi + 9); vesselCreatedEvent3.getVessel().setType(vesselTypeUpdatedEvent.getVesselType()); kafkaTemplate.send(VESSEL_TOPIC, vesselCreatedEvent3.getAggregateId(), vesselCreatedEvent3); Thread.sleep(2000); // Envía create para simular uno a medias en el stream y lo saca de la cola CreateVesselEvent createVesselEvent = VesselDataUtil.getCreateEvent(mmsi + 10); kafkaTemplate.send(VESSEL_TOPIC, createVesselEvent.getAggregateId(), createVesselEvent); Thread.sleep(2000); // Envía evento de vesselType actualizado para que genere los eventos de // postUpdate kafkaTemplate.send(VESSELTYPE_TOPIC, vesselTypeUpdatedEvent.getAggregateId(), vesselTypeUpdatedEvent); Thread.sleep(2000); Event update = (Event) blockingQueue.poll(30, TimeUnit.SECONDS); UpdateVesselTypeInVesselEvent update = (UpdateVesselTypeInVesselEvent) blockingQueue.poll(30, TimeUnit.SECONDS); assertNotNull(update); assertEquals(VesselEventTypes.UPDATE, update.getType()); assertEquals(vesselTypeUpdatedEvent.getVesselType(), ((UpdateVesselEvent) update).getVessel().getType()); assertEquals(vesselCreatedEvent.getAggregateId(), ((UpdateVesselEvent) update).getAggregateId()); assertEquals(VesselEventTypes.UPDATE_VESSELTYPE, update.getType()); assertEquals(vesselTypeUpdatedEvent.getVesselType(), update.getVesselType()); assertEquals(vesselCreatedEvent.getAggregateId(), update.getAggregateId()); Event update2 = (Event) blockingQueue.poll(30, TimeUnit.SECONDS); assertNotNull(update2); assertEquals(VesselEventTypes.UPDATE, update2.getType()); assertEquals(vesselTypeUpdatedEvent.getVesselType(), ((UpdateVesselEvent) update2).getVessel().getType()); assertEquals(vesselCreatedEvent2.getAggregateId(), ((UpdateVesselEvent) update2).getAggregateId()); // Envía confirmación para simular que view lo insertó kafkaTemplate.send(VESSEL_TOPIC, update.getAggregateId(), new UpdateVesselConfirmedEvent().buildFrom(update)); Event update3 = (Event) blockingQueue.poll(20, TimeUnit.SECONDS); UpdateVesselTypeInVesselEvent update2 = (UpdateVesselTypeInVesselEvent) blockingQueue.poll(30, TimeUnit.SECONDS); assertNotNull(update2); assertEquals(VesselEventTypes.UPDATE_VESSELTYPE, update2.getType()); assertEquals(vesselTypeUpdatedEvent.getVesselType(), update2.getVesselType()); assertEquals(vesselCreatedEvent2.getAggregateId(), update2.getAggregateId()); // Envía fallo para simular que view no lo insertó UpdateVesselFailedEvent updateVesselFailedEvent = new UpdateVesselFailedEvent().buildFrom(update2); updateVesselFailedEvent.setExceptionType(ExceptionType.ITEM_NOT_FOUND.name()); Map<String, String> arguments = new HashMap<>(); arguments.put("A", "B"); updateVesselFailedEvent.setArguments(arguments); kafkaTemplate.send(VESSEL_TOPIC, update2.getAggregateId(), updateVesselFailedEvent); Event update3 = (Event) blockingQueue.poll(30, TimeUnit.SECONDS); assertNull(update3); // LLegó un mensaje de alerta Message message = (Message) blockingQueueForAlerts.poll(30, TimeUnit.SECONDS); assertNotNull(message); assertEquals(AlertType.ERROR.name(), message.getType()); // Se modificó bien el primer vessel VesselUpdatedEvent updated = (VesselUpdatedEvent) blockingQueueForUpdatedEvents.poll(30, TimeUnit.SECONDS); assertNotNull(updated); assertEquals(VesselEventTypes.UPDATED, updated.getType()); assertEquals(vesselTypeUpdatedEvent.getVesselType(), updated.getVessel().getType()); assertEquals(vesselCreatedEvent.getAggregateId(), updated.getAggregateId()); // No se modificó bien el segundo vessel VesselCancelledEvent cancelled = (VesselCancelledEvent) blockingQueueForCancelledEvents.poll(30, TimeUnit.SECONDS); assertNotNull(cancelled); assertEquals(VesselEventTypes.UPDATE_CANCELLED, cancelled.getType()); assertEquals(vesselCreatedEvent2.getVessel().getType(), cancelled.getVessel().getType()); assertEquals(vesselCreatedEvent2.getAggregateId(), cancelled.getAggregateId()); // LLegó un mensaje de alerta Message message2 = (Message) blockingQueueForAlerts.poll(30, TimeUnit.SECONDS); assertNotNull(message2); assertEquals(AlertType.ERROR.name(), message2.getType()); } @KafkaHandler public void updateVesselEventFromPostUpdate(UpdateVesselEvent updateVesselEvent) { public void updateVesselEventFromPostUpdate(UpdateVesselTypeInVesselEvent updateVesselEvent) { blockingQueue.offer(updateVesselEvent); } @KafkaHandler public void vesselUpdatedEventFromPostUpdate(VesselUpdatedEvent vesselUpdatedEvent) { blockingQueueForUpdatedEvents.offer(vesselUpdatedEvent); } @KafkaHandler public void vesselCancelledEventFromPostUpdate(VesselCancelledEvent vesselCancelledEvent) { blockingQueueForCancelledEvents.offer(vesselCancelledEvent); } @KafkaListener(topics = "${broker.topic.alert}", groupId = "test") public void errorAlert(Message message) { blockingQueueForAlerts.offer(message); Loading
vessels-commands/src/test/java/es/redmic/test/vesselscommands/integration/vesseltype/VesselTypeDataUtil.java +1 −1 Original line number Diff line number Diff line Loading @@ -24,7 +24,7 @@ import es.redmic.vesselslib.events.vesseltype.update.VesselTypeUpdatedEvent; public abstract class VesselTypeDataUtil { public final static String PREFIX = "vesseltype-code-", USER = "1"; public final static String PREFIX = "vesseltype-code-", USER = "REDMIC_PROCESS"; public static CreateVesselTypeEvent getCreateEvent(String code) { Loading