Loading vessels-commands/src/main/java/es/redmic/vesselscommands/aggregate/VesselAggregate.java +21 −30 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ import es.redmic.vesselscommands.commands.DeleteVesselCommand; import es.redmic.vesselscommands.commands.UpdateVesselCommand; import es.redmic.vesselscommands.statestore.VesselStateStore; import es.redmic.vesselslib.dto.VesselDTO; import es.redmic.vesselslib.events.vessel.VesselEventType; import es.redmic.vesselslib.events.vessel.VesselEventTypes; import es.redmic.vesselslib.events.vessel.common.VesselEvent; import es.redmic.vesselslib.events.vessel.create.CreateVesselCancelledEvent; import es.redmic.vesselslib.events.vessel.create.CreateVesselConfirmedEvent; Loading Loading @@ -75,7 +75,7 @@ public class VesselAggregate extends Aggregate { if (state == null) { logger.error("Intentando modificar un elemento del cual no se tiene historial, ", vesselId); throw new HistoryNotFoundException(VesselEventType.UPDATE_VESSEL.toString(), vesselId); throw new HistoryNotFoundException(VesselEventTypes.UPDATE.toString(), vesselId); } loadFromHistory(state); Loading @@ -85,7 +85,7 @@ public class VesselAggregate extends Aggregate { throw new ItemNotFoundException("id", vesselId); } if (itemIsLocked(state.getType())) { if (VesselEventTypes.isLocked(state.getType())) { logger.error("Intentando modificar un elemento bloqueado por una edición en curso, ", vesselId); throw new ItemLockedException("id", vesselId); } Loading @@ -106,7 +106,7 @@ public class VesselAggregate extends Aggregate { if (state == null) { logger.error("Intentando eliminar un elemento del cual no se tiene historial, " + vesselId); throw new HistoryNotFoundException(VesselEventType.DELETE_VESSEL.toString(), vesselId); throw new HistoryNotFoundException(VesselEventTypes.DELETE.toString(), vesselId); } loadFromHistory(state); Loading @@ -116,7 +116,7 @@ public class VesselAggregate extends Aggregate { throw new ItemNotFoundException("id", vesselId); } if (itemIsLocked(state.getType())) { if (VesselEventTypes.isLocked(state.getType())) { logger.error("Intentando eliminar un elemento bloqueado por una edición en curso, ", vesselId); throw new ItemLockedException("id", vesselId); } Loading Loading @@ -153,47 +153,47 @@ public class VesselAggregate extends Aggregate { String eventType = history.getType(); switch (VesselEventType.valueOf(eventType)) { case CREATE_VESSEL: switch (eventType) { case "CREATE": apply((CreateVesselEvent) history); break; case CREATE_VESSEL_CONFIRMED: case "CREATE_CONFIRMED": apply((CreateVesselConfirmedEvent) history); break; case VESSEL_CREATED: case "CREATED": apply((VesselCreatedEvent) history); break; case UPDATE_VESSEL: case "UPDATE": apply((UpdateVesselEvent) history); break; case UPDATE_VESSEL_CONFIRMED: case "UPDATE_CONFIRMED": apply((UpdateVesselConfirmedEvent) history); break; case VESSEL_UPDATED: case "UPDATED": apply((VesselUpdatedEvent) history); break; case DELETE_VESSEL: case "DELETE": apply((DeleteVesselEvent) history); break; case DELETE_VESSEL_CONFIRMED: case "DELETE_CONFIRMED": apply((DeleteVesselConfirmedEvent) history); break; case VESSEL_DELETED: case "DELETED": apply((VesselDeletedEvent) history); break; // FAILED case CREATE_VESSEL_FAILED: case UPDATE_VESSEL_FAILED: case DELETE_VESSEL_FAILED: case "CREATE_FAILED": case "UPDATE_FAILED": case "DELETE_FAILED": logger.debug("Evento fallido"); _apply((SimpleEvent) history); break; // CANCELLED case CREATE_VESSEL_CANCELLED: case "CREATE_CANCELLED": apply((CreateVesselCancelledEvent) history); break; case UPDATE_VESSEL_CANCELLED: case DELETE_VESSEL_CANCELLED: case "UPDATE_CANCELLED": case "DELETE_CANCELLED": logger.debug("Compensación por edición/borrado fallido"); _apply((VesselEvent) history); break; Loading Loading @@ -269,13 +269,4 @@ public class VesselAggregate extends Aggregate { this.vessel = null; super.reset(); } private boolean itemIsLocked(String eventType) { return !(eventType.equals(VesselEventType.VESSEL_CREATED.toString()) || eventType.equals(VesselEventType.VESSEL_UPDATED.toString()) || eventType.equals(VesselEventType.CREATE_VESSEL_CANCELLED.toString()) || eventType.equals(VesselEventType.UPDATE_VESSEL_CANCELLED.toString()) || eventType.equals(VesselEventType.DELETE_VESSEL_CANCELLED.toString())); } } vessels-commands/src/main/java/es/redmic/vesselscommands/aggregate/VesselTypeAggregate.java +21 −30 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ import es.redmic.vesselscommands.commands.DeleteVesselTypeCommand; import es.redmic.vesselscommands.commands.UpdateVesselTypeCommand; import es.redmic.vesselscommands.statestore.VesselTypeStateStore; import es.redmic.vesselslib.dto.VesselTypeDTO; import es.redmic.vesselslib.events.vesseltype.VesselTypeEventType; import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes; import es.redmic.vesselslib.events.vesseltype.common.VesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeCancelledEvent; import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeConfirmedEvent; Loading Loading @@ -74,7 +74,7 @@ public class VesselTypeAggregate extends Aggregate { if (state == null) { logger.error("Intentando modificar un elemento del cual no se tiene historial", vesselTypeId); throw new HistoryNotFoundException(VesselTypeEventType.UPDATE_VESSELTYPE.toString(), vesselTypeId); throw new HistoryNotFoundException(VesselTypeEventTypes.UPDATE, vesselTypeId); } loadFromHistory(state); Loading @@ -84,7 +84,7 @@ public class VesselTypeAggregate extends Aggregate { throw new ItemNotFoundException("id", vesselTypeId); } if (itemIsLocked(state.getType())) { if (VesselTypeEventTypes.isLocked(state.getType())) { logger.error("Intentando modificar un elemento bloqueado por una edición en curso, ", vesselTypeId); throw new ItemLockedException("id", vesselTypeId); } Loading @@ -105,7 +105,7 @@ public class VesselTypeAggregate extends Aggregate { if (state == null) { logger.error("Intentando eliminar un elemento del cual no se tiene historial, " + vesselTypeId); throw new HistoryNotFoundException(VesselTypeEventType.UPDATE_VESSELTYPE.toString(), vesselTypeId); throw new HistoryNotFoundException(VesselTypeEventTypes.UPDATE, vesselTypeId); } loadFromHistory(state); Loading @@ -115,7 +115,7 @@ public class VesselTypeAggregate extends Aggregate { throw new ItemNotFoundException("id", vesselTypeId); } if (itemIsLocked(state.getType())) { if (VesselTypeEventTypes.isLocked(state.getType())) { logger.error("Intentando eliminar un elemento bloqueado por una edición en curso, ", vesselTypeId); throw new ItemLockedException("id", vesselTypeId); } Loading Loading @@ -152,47 +152,47 @@ public class VesselTypeAggregate extends Aggregate { String eventType = history.getType(); switch (VesselTypeEventType.valueOf(eventType)) { case CREATE_VESSELTYPE: switch (eventType) { case "CREATE": apply((CreateVesselTypeEvent) history); break; case CREATE_VESSELTYPE_CONFIRMED: case "CREATE_CONFIRMED": apply((CreateVesselTypeConfirmedEvent) history); break; case VESSELTYPE_CREATED: case "CREATED": apply((VesselTypeCreatedEvent) history); break; case UPDATE_VESSELTYPE: case "UPDATE": apply((UpdateVesselTypeEvent) history); break; case UPDATE_VESSELTYPE_CONFIRMED: case "UPDATE_CONFIRMED": apply((UpdateVesselTypeConfirmedEvent) history); break; case VESSELTYPE_UPDATED: case "UPDATED": apply((VesselTypeUpdatedEvent) history); break; case DELETE_VESSELTYPE: case "DELETE": apply((DeleteVesselTypeEvent) history); break; case DELETE_VESSELTYPE_CONFIRMED: case "DELETE_CONFIRMED": apply((DeleteVesselTypeConfirmedEvent) history); break; case VESSELTYPE_DELETED: case "DELETED": apply((VesselTypeDeletedEvent) history); break; // FAILED case CREATE_VESSELTYPE_FAILED: case UPDATE_VESSELTYPE_FAILED: case DELETE_VESSELTYPE_FAILED: case "CREATE_FAILED": case "UPDATE_FAILED": case "DELETE_FAILED": logger.debug("Evento fallido"); _apply((SimpleEvent) history); break; // CANCELLED case CREATE_VESSELTYPE_CANCELLED: case "CREATE_CANCELLED": apply((CreateVesselTypeCancelledEvent) history); break; case UPDATE_VESSELTYPE_CANCELLED: case DELETE_VESSELTYPE_CANCELLED: case "UPDATE_CANCELLED": case "DELETE_CANCELLED": logger.debug("Compensación por edición/borrado fallido"); _apply((VesselTypeEvent) history); break; Loading Loading @@ -267,13 +267,4 @@ public class VesselTypeAggregate extends Aggregate { this.vesselType = null; super.reset(); } private boolean itemIsLocked(String eventType) { return !(eventType.equals(VesselTypeEventType.VESSELTYPE_CREATED.toString()) || eventType.equals(VesselTypeEventType.VESSELTYPE_UPDATED.toString()) || eventType.equals(VesselTypeEventType.CREATE_VESSELTYPE_CANCELLED.toString()) || eventType.equals(VesselTypeEventType.UPDATE_VESSELTYPE_CANCELLED.toString()) || eventType.equals(VesselTypeEventType.DELETE_VESSELTYPE_CANCELLED.toString())); } } vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselEventStreams.java +23 −32 Original line number Diff line number Diff line Loading @@ -18,14 +18,14 @@ import es.redmic.commandslib.statestore.StreamConfig; import es.redmic.commandslib.streams.EventStreams; import es.redmic.vesselslib.dto.VesselDTO; import es.redmic.vesselslib.dto.VesselTypeDTO; import es.redmic.vesselslib.events.vessel.VesselEventType; import es.redmic.vesselslib.events.vessel.VesselEventTypes; import es.redmic.vesselslib.events.vessel.common.VesselEvent; import es.redmic.vesselslib.events.vessel.create.VesselCreatedEvent; import es.redmic.vesselslib.events.vessel.delete.DeleteVesselCancelledEvent; import es.redmic.vesselslib.events.vessel.update.UpdateVesselCancelledEvent; import es.redmic.vesselslib.events.vessel.update.UpdateVesselEvent; import es.redmic.vesselslib.events.vessel.update.VesselUpdatedEvent; import es.redmic.vesselslib.events.vesseltype.VesselTypeEventType; import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes; import es.redmic.vesselslib.events.vesseltype.common.VesselTypeEvent; public class VesselEventStreams extends EventStreams { Loading @@ -45,11 +45,11 @@ public class VesselEventStreams extends EventStreams { // Stream filtrado por eventos de confirmación al crear KStream<String, Event> createConfirmedEvents = vesselEvents .filter((id, event) -> (VesselEventType.CREATE_VESSEL_CONFIRMED.toString().equals(event.getType()))); .filter((id, event) -> (VesselEventTypes.CREATE_CONFIRMED.equals(event.getType()))); // Stream filtrado por eventos de petición de crear KStream<String, Event> createRequestEvents = vesselEvents .filter((id, event) -> (VesselEventType.CREATE_VESSEL.toString().equals(event.getType()))); .filter((id, event) -> (VesselEventTypes.CREATE.equals(event.getType()))); // Join por id, mandando a kafka el evento de éxito createConfirmedEvents.join(createRequestEvents, Loading @@ -59,9 +59,9 @@ public class VesselEventStreams extends EventStreams { private Event getCreatedEvent(Event confirmedEvent, Event requestEvent) { assert requestEvent.getType().equals(VesselEventType.CREATE_VESSEL.name()); assert requestEvent.getType().equals(VesselEventTypes.CREATE); assert confirmedEvent.getType().equals(VesselEventType.CREATE_VESSEL_CONFIRMED.name()); assert confirmedEvent.getType().equals(VesselEventTypes.CREATE_CONFIRMED); if (!isSameSession(confirmedEvent, requestEvent)) { return null; Loading @@ -81,11 +81,11 @@ public class VesselEventStreams extends EventStreams { // Stream filtrado por eventos de confirmación al modificar KStream<String, Event> updateConfirmedEvents = vesselEvents .filter((id, event) -> (VesselEventType.UPDATE_VESSEL_CONFIRMED.toString().equals(event.getType()))); .filter((id, event) -> (VesselEventTypes.UPDATE_CONFIRMED.equals(event.getType()))); // Stream filtrado por eventos de petición de modificar KStream<String, Event> updateRequestEvents = vesselEvents .filter((id, event) -> (VesselEventType.UPDATE_VESSEL.toString().equals(event.getType()))); .filter((id, event) -> (VesselEventTypes.UPDATE.equals(event.getType()))); // Join por id, mandando a kafka el evento de éxito updateConfirmedEvents.join(updateRequestEvents, Loading @@ -95,9 +95,9 @@ public class VesselEventStreams extends EventStreams { private Event getUpdatedEvent(Event confirmedEvent, Event requestEvent) { assert requestEvent.getType().equals(VesselEventType.UPDATE_VESSEL.name()); assert requestEvent.getType().equals(VesselEventTypes.UPDATE); assert confirmedEvent.getType().equals(VesselEventType.UPDATE_VESSEL_CONFIRMED.name()); assert confirmedEvent.getType().equals(VesselEventTypes.UPDATE_CONFIRMED); if (!isSameSession(confirmedEvent, requestEvent)) { return null; Loading @@ -118,8 +118,8 @@ public class VesselEventStreams extends EventStreams { // Stream filtrado por eventos de creaciones y modificaciones correctos (solo el // último que se produzca por id) KStream<String, Event> successEvents = vesselEvents .filter((id, event) -> (VesselEventType.VESSEL_CREATED.toString().equals(event.getType()) || VesselEventType.VESSEL_UPDATED.toString().equals(event.getType()))); .filter((id, event) -> (VesselEventTypes.CREATED.equals(event.getType()) || VesselEventTypes.UPDATED.equals(event.getType()))); processUpdateFailedStream(vesselEvents, successEvents); Loading @@ -131,7 +131,7 @@ public class VesselEventStreams extends EventStreams { // Stream filtrado por eventos de fallo al modificar KStream<String, Event> failedEvents = vesselEvents .filter((id, event) -> (VesselEventType.UPDATE_VESSEL_FAILED.toString().equals(event.getType()))); .filter((id, event) -> (VesselEventTypes.UPDATE_FAILED.equals(event.getType()))); KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue); Loading @@ -147,7 +147,7 @@ public class VesselEventStreams extends EventStreams { // Stream filtrado por eventos de fallo al borrar KStream<String, Event> failedEvents = vesselEvents .filter((id, event) -> (VesselEventType.DELETE_VESSEL_FAILED.toString().equals(event.getType()))); .filter((id, event) -> (VesselEventTypes.DELETE_FAILED.equals(event.getType()))); KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue); Loading @@ -160,10 +160,10 @@ public class VesselEventStreams extends EventStreams { private Event getUpdateCancelledEvent(Event failedEvent, Event lastSuccessEvent) { assert lastSuccessEvent.getType().equals(VesselEventType.VESSEL_CREATED.name()) || lastSuccessEvent.getType().equals(VesselEventType.VESSEL_UPDATED.name()); assert lastSuccessEvent.getType().equals(VesselEventTypes.CREATED) || lastSuccessEvent.getType().equals(VesselEventTypes.UPDATED); assert failedEvent.getType().equals(VesselEventType.UPDATE_VESSEL_FAILED.name()); assert failedEvent.getType().equals(VesselEventTypes.UPDATE_FAILED); VesselDTO vessel = ((VesselEvent) lastSuccessEvent).getVessel(); Loading @@ -180,10 +180,10 @@ public class VesselEventStreams extends EventStreams { private Event getDeleteCancelledEvent(Event failedEvent, Event lastSuccessEvent) { assert lastSuccessEvent.getType().equals(VesselEventType.VESSEL_CREATED.name()) || lastSuccessEvent.getType().equals(VesselEventType.VESSEL_UPDATED.name()); assert lastSuccessEvent.getType().equals(VesselEventTypes.CREATED) || lastSuccessEvent.getType().equals(VesselEventTypes.UPDATED); assert failedEvent.getType().equals(VesselEventType.DELETE_VESSEL_FAILED.name()); assert failedEvent.getType().equals(VesselEventTypes.DELETE_FAILED); VesselDTO vessel = ((VesselEvent) lastSuccessEvent).getVessel(); Loading Loading @@ -222,7 +222,7 @@ public class VesselEventStreams extends EventStreams { KStream<String, Event> vesselTypeEvents = builder.stream(vesselTypeTopic); KStream<String, Event> updateReferenceEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventType.VESSELTYPE_UPDATED.toString().equals(event.getType()))); .filter((id, event) -> (VesselTypeEventTypes.UPDATED.equals(event.getType()))); KStream<String, ArrayList<VesselEvent>> join = updateReferenceEvents.join(vesselEventsTable, (updateReferenceEvent, vesselWithReferenceEvents) -> getPostUpdateEvent(updateReferenceEvent, Loading @@ -242,9 +242,9 @@ public class VesselEventStreams extends EventStreams { VesselEvent vesselEvent = entry.getValue(); VesselTypeDTO vesselType = ((VesselTypeEvent) updateReferenceEvent).getVesselType(); if (itemIsLocked(vesselEvent.getType())) { if (VesselEventTypes.isLocked(vesselEvent.getType())) { if (!vesselEvent.getType().equals(VesselEventType.VESSEL_DELETED.toString())) { if (!vesselEvent.getType().equals(VesselEventTypes.DELETED)) { String message = "Item con id " + vesselEvent.getAggregateId() + " se encuentra en mitad de un ciclo de creación o edición, por lo que no se modificó la referencia " + updateReferenceEvent.getAggregateId(); Loading Loading @@ -276,13 +276,4 @@ public class VesselEventStreams extends EventStreams { } return result; } private boolean itemIsLocked(String type) { return !(VesselEventType.VESSEL_CREATED.toString().equals(type) || VesselEventType.VESSEL_UPDATED.toString().equals(type) || VesselEventType.CREATE_VESSEL_CANCELLED.toString().equals(type) || VesselEventType.UPDATE_VESSEL_CANCELLED.toString().equals(type) || VesselEventType.DELETE_VESSEL_CANCELLED.toString().equals(type)); } } vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselTypeEventStreams.java +23 −23 Original line number Diff line number Diff line Loading @@ -10,7 +10,7 @@ import es.redmic.brokerlib.avro.common.EventError; import es.redmic.commandslib.statestore.StreamConfig; import es.redmic.commandslib.streams.EventStreams; import es.redmic.vesselslib.dto.VesselTypeDTO; import es.redmic.vesselslib.events.vesseltype.VesselTypeEventType; import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes; import es.redmic.vesselslib.events.vesseltype.common.VesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.create.VesselTypeCreatedEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCancelledEvent; Loading @@ -29,12 +29,12 @@ public class VesselTypeEventStreams extends EventStreams { protected void processCreatedStream(KStream<String, Event> vesselTypeEvents) { // Stream filtrado por eventos de confirmación al crear KStream<String, Event> createConfirmedEvents = vesselTypeEvents.filter( (id, event) -> (VesselTypeEventType.CREATE_VESSELTYPE_CONFIRMED.toString().equals(event.getType()))); KStream<String, Event> createConfirmedEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventTypes.CREATE_CONFIRMED.toString().equals(event.getType()))); // Stream filtrado por eventos de petición de crear KStream<String, Event> createRequestEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventType.CREATE_VESSELTYPE.toString().equals(event.getType()))); .filter((id, event) -> (VesselTypeEventTypes.CREATE.toString().equals(event.getType()))); // Join por id, mandando a kafka el evento de éxito createConfirmedEvents.join(createRequestEvents, Loading @@ -44,9 +44,9 @@ public class VesselTypeEventStreams extends EventStreams { private Event getCreatedEvent(Event confirmedEvent, Event requestEvent) { assert requestEvent.getType().equals(VesselTypeEventType.CREATE_VESSELTYPE.name()); assert requestEvent.getType().equals(VesselTypeEventTypes.CREATE); assert confirmedEvent.getType().equals(VesselTypeEventType.CREATE_VESSELTYPE_CONFIRMED.name()); assert confirmedEvent.getType().equals(VesselTypeEventTypes.CREATE_CONFIRMED); if (!isSameSession(confirmedEvent, requestEvent)) { return null; Loading @@ -65,12 +65,12 @@ public class VesselTypeEventStreams extends EventStreams { protected void processUpdatedStream(KStream<String, Event> vesselTypeEvents) { // Stream filtrado por eventos de confirmación al modificar KStream<String, Event> updateConfirmedEvents = vesselTypeEvents.filter( (id, event) -> (VesselTypeEventType.UPDATE_VESSELTYPE_CONFIRMED.toString().equals(event.getType()))); KStream<String, Event> updateConfirmedEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventTypes.UPDATE_CONFIRMED.toString().equals(event.getType()))); // Stream filtrado por eventos de petición de modificar KStream<String, Event> updateRequestEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventType.UPDATE_VESSELTYPE.toString().equals(event.getType()))); .filter((id, event) -> (VesselTypeEventTypes.UPDATE.toString().equals(event.getType()))); // Join por id, mandando a kafka el evento de éxito updateConfirmedEvents.join(updateRequestEvents, Loading @@ -80,9 +80,9 @@ public class VesselTypeEventStreams extends EventStreams { private Event getUpdatedEvent(Event confirmedEvent, Event requestEvent) { assert requestEvent.getType().equals(VesselTypeEventType.UPDATE_VESSELTYPE.name()); assert requestEvent.getType().equals(VesselTypeEventTypes.UPDATE); assert confirmedEvent.getType().equals(VesselTypeEventType.UPDATE_VESSELTYPE_CONFIRMED.name()); assert confirmedEvent.getType().equals(VesselTypeEventTypes.UPDATE_CONFIRMED); if (!isSameSession(confirmedEvent, requestEvent)) { return null; Loading @@ -105,8 +105,8 @@ public class VesselTypeEventStreams extends EventStreams { // Stream filtrado por eventos de creaciones y modificaciones correctos (solo el // último que se produzca por id) KStream<String, Event> successEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventType.VESSELTYPE_CREATED.toString().equals(event.getType()) || VesselTypeEventType.VESSELTYPE_UPDATED.toString().equals(event.getType()))); .filter((id, event) -> (VesselTypeEventTypes.CREATED.toString().equals(event.getType()) || VesselTypeEventTypes.VESSELTYPE_UPDATED.toString().equals(event.getType()))); processUpdateFailedStream(vesselTypeEvents, successEvents); Loading @@ -118,8 +118,8 @@ public class VesselTypeEventStreams extends EventStreams { KStream<String, Event> successEvents) { // Stream filtrado por eventos de fallo al modificar KStream<String, Event> failedEvents = vesselTypeEvents.filter( (id, event) -> (VesselTypeEventType.UPDATE_VESSELTYPE_FAILED.toString().equals(event.getType()))); KStream<String, Event> failedEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventTypes.UPDATE_FAILED.toString().equals(event.getType()))); KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue); Loading @@ -134,8 +134,8 @@ public class VesselTypeEventStreams extends EventStreams { KStream<String, Event> successEvents) { // Stream filtrado por eventos de fallo al borrar KStream<String, Event> failedEvents = vesselTypeEvents.filter( (id, event) -> (VesselTypeEventType.DELETE_VESSELTYPE_FAILED.toString().equals(event.getType()))); KStream<String, Event> failedEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventTypes.DELETE_FAILED.toString().equals(event.getType()))); KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue); Loading @@ -148,10 +148,10 @@ public class VesselTypeEventStreams extends EventStreams { private Event getUpdateCancelledEvent(Event failedEvent, Event lastSuccessEvent) { assert failedEvent.getType().equals(VesselTypeEventType.UPDATE_VESSELTYPE_FAILED.name()); assert failedEvent.getType().equals(VesselTypeEventTypes.UPDATE_FAILED); assert lastSuccessEvent.getType().equals(VesselTypeEventType.VESSELTYPE_CREATED.name()) || lastSuccessEvent.getType().equals(VesselTypeEventType.VESSELTYPE_UPDATED.name()); assert lastSuccessEvent.getType().equals(VesselTypeEventTypes.CREATED) || lastSuccessEvent.getType().equals(VesselTypeEventTypes.VESSELTYPE_UPDATED); VesselTypeDTO vesselType = ((VesselTypeEvent) lastSuccessEvent).getVesselType(); Loading @@ -168,10 +168,10 @@ public class VesselTypeEventStreams extends EventStreams { private Event getDeleteCancelledEvent(Event failedEvent, Event lastSuccessEvent) { assert failedEvent.getType().equals(VesselTypeEventType.DELETE_VESSELTYPE_FAILED.name()); assert failedEvent.getType().equals(VesselTypeEventTypes.DELETE_FAILED); assert lastSuccessEvent.getType().equals(VesselTypeEventType.VESSELTYPE_CREATED.name()) || lastSuccessEvent.getType().equals(VesselTypeEventType.VESSELTYPE_UPDATED.name()); assert lastSuccessEvent.getType().equals(VesselTypeEventTypes.CREATED) || lastSuccessEvent.getType().equals(VesselTypeEventTypes.VESSELTYPE_UPDATED); VesselTypeDTO vesselType = ((VesselTypeEvent) lastSuccessEvent).getVesselType(); Loading Loading
vessels-commands/src/main/java/es/redmic/vesselscommands/aggregate/VesselAggregate.java +21 −30 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ import es.redmic.vesselscommands.commands.DeleteVesselCommand; import es.redmic.vesselscommands.commands.UpdateVesselCommand; import es.redmic.vesselscommands.statestore.VesselStateStore; import es.redmic.vesselslib.dto.VesselDTO; import es.redmic.vesselslib.events.vessel.VesselEventType; import es.redmic.vesselslib.events.vessel.VesselEventTypes; import es.redmic.vesselslib.events.vessel.common.VesselEvent; import es.redmic.vesselslib.events.vessel.create.CreateVesselCancelledEvent; import es.redmic.vesselslib.events.vessel.create.CreateVesselConfirmedEvent; Loading Loading @@ -75,7 +75,7 @@ public class VesselAggregate extends Aggregate { if (state == null) { logger.error("Intentando modificar un elemento del cual no se tiene historial, ", vesselId); throw new HistoryNotFoundException(VesselEventType.UPDATE_VESSEL.toString(), vesselId); throw new HistoryNotFoundException(VesselEventTypes.UPDATE.toString(), vesselId); } loadFromHistory(state); Loading @@ -85,7 +85,7 @@ public class VesselAggregate extends Aggregate { throw new ItemNotFoundException("id", vesselId); } if (itemIsLocked(state.getType())) { if (VesselEventTypes.isLocked(state.getType())) { logger.error("Intentando modificar un elemento bloqueado por una edición en curso, ", vesselId); throw new ItemLockedException("id", vesselId); } Loading @@ -106,7 +106,7 @@ public class VesselAggregate extends Aggregate { if (state == null) { logger.error("Intentando eliminar un elemento del cual no se tiene historial, " + vesselId); throw new HistoryNotFoundException(VesselEventType.DELETE_VESSEL.toString(), vesselId); throw new HistoryNotFoundException(VesselEventTypes.DELETE.toString(), vesselId); } loadFromHistory(state); Loading @@ -116,7 +116,7 @@ public class VesselAggregate extends Aggregate { throw new ItemNotFoundException("id", vesselId); } if (itemIsLocked(state.getType())) { if (VesselEventTypes.isLocked(state.getType())) { logger.error("Intentando eliminar un elemento bloqueado por una edición en curso, ", vesselId); throw new ItemLockedException("id", vesselId); } Loading Loading @@ -153,47 +153,47 @@ public class VesselAggregate extends Aggregate { String eventType = history.getType(); switch (VesselEventType.valueOf(eventType)) { case CREATE_VESSEL: switch (eventType) { case "CREATE": apply((CreateVesselEvent) history); break; case CREATE_VESSEL_CONFIRMED: case "CREATE_CONFIRMED": apply((CreateVesselConfirmedEvent) history); break; case VESSEL_CREATED: case "CREATED": apply((VesselCreatedEvent) history); break; case UPDATE_VESSEL: case "UPDATE": apply((UpdateVesselEvent) history); break; case UPDATE_VESSEL_CONFIRMED: case "UPDATE_CONFIRMED": apply((UpdateVesselConfirmedEvent) history); break; case VESSEL_UPDATED: case "UPDATED": apply((VesselUpdatedEvent) history); break; case DELETE_VESSEL: case "DELETE": apply((DeleteVesselEvent) history); break; case DELETE_VESSEL_CONFIRMED: case "DELETE_CONFIRMED": apply((DeleteVesselConfirmedEvent) history); break; case VESSEL_DELETED: case "DELETED": apply((VesselDeletedEvent) history); break; // FAILED case CREATE_VESSEL_FAILED: case UPDATE_VESSEL_FAILED: case DELETE_VESSEL_FAILED: case "CREATE_FAILED": case "UPDATE_FAILED": case "DELETE_FAILED": logger.debug("Evento fallido"); _apply((SimpleEvent) history); break; // CANCELLED case CREATE_VESSEL_CANCELLED: case "CREATE_CANCELLED": apply((CreateVesselCancelledEvent) history); break; case UPDATE_VESSEL_CANCELLED: case DELETE_VESSEL_CANCELLED: case "UPDATE_CANCELLED": case "DELETE_CANCELLED": logger.debug("Compensación por edición/borrado fallido"); _apply((VesselEvent) history); break; Loading Loading @@ -269,13 +269,4 @@ public class VesselAggregate extends Aggregate { this.vessel = null; super.reset(); } private boolean itemIsLocked(String eventType) { return !(eventType.equals(VesselEventType.VESSEL_CREATED.toString()) || eventType.equals(VesselEventType.VESSEL_UPDATED.toString()) || eventType.equals(VesselEventType.CREATE_VESSEL_CANCELLED.toString()) || eventType.equals(VesselEventType.UPDATE_VESSEL_CANCELLED.toString()) || eventType.equals(VesselEventType.DELETE_VESSEL_CANCELLED.toString())); } }
vessels-commands/src/main/java/es/redmic/vesselscommands/aggregate/VesselTypeAggregate.java +21 −30 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ import es.redmic.vesselscommands.commands.DeleteVesselTypeCommand; import es.redmic.vesselscommands.commands.UpdateVesselTypeCommand; import es.redmic.vesselscommands.statestore.VesselTypeStateStore; import es.redmic.vesselslib.dto.VesselTypeDTO; import es.redmic.vesselslib.events.vesseltype.VesselTypeEventType; import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes; import es.redmic.vesselslib.events.vesseltype.common.VesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeCancelledEvent; import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeConfirmedEvent; Loading Loading @@ -74,7 +74,7 @@ public class VesselTypeAggregate extends Aggregate { if (state == null) { logger.error("Intentando modificar un elemento del cual no se tiene historial", vesselTypeId); throw new HistoryNotFoundException(VesselTypeEventType.UPDATE_VESSELTYPE.toString(), vesselTypeId); throw new HistoryNotFoundException(VesselTypeEventTypes.UPDATE, vesselTypeId); } loadFromHistory(state); Loading @@ -84,7 +84,7 @@ public class VesselTypeAggregate extends Aggregate { throw new ItemNotFoundException("id", vesselTypeId); } if (itemIsLocked(state.getType())) { if (VesselTypeEventTypes.isLocked(state.getType())) { logger.error("Intentando modificar un elemento bloqueado por una edición en curso, ", vesselTypeId); throw new ItemLockedException("id", vesselTypeId); } Loading @@ -105,7 +105,7 @@ public class VesselTypeAggregate extends Aggregate { if (state == null) { logger.error("Intentando eliminar un elemento del cual no se tiene historial, " + vesselTypeId); throw new HistoryNotFoundException(VesselTypeEventType.UPDATE_VESSELTYPE.toString(), vesselTypeId); throw new HistoryNotFoundException(VesselTypeEventTypes.UPDATE, vesselTypeId); } loadFromHistory(state); Loading @@ -115,7 +115,7 @@ public class VesselTypeAggregate extends Aggregate { throw new ItemNotFoundException("id", vesselTypeId); } if (itemIsLocked(state.getType())) { if (VesselTypeEventTypes.isLocked(state.getType())) { logger.error("Intentando eliminar un elemento bloqueado por una edición en curso, ", vesselTypeId); throw new ItemLockedException("id", vesselTypeId); } Loading Loading @@ -152,47 +152,47 @@ public class VesselTypeAggregate extends Aggregate { String eventType = history.getType(); switch (VesselTypeEventType.valueOf(eventType)) { case CREATE_VESSELTYPE: switch (eventType) { case "CREATE": apply((CreateVesselTypeEvent) history); break; case CREATE_VESSELTYPE_CONFIRMED: case "CREATE_CONFIRMED": apply((CreateVesselTypeConfirmedEvent) history); break; case VESSELTYPE_CREATED: case "CREATED": apply((VesselTypeCreatedEvent) history); break; case UPDATE_VESSELTYPE: case "UPDATE": apply((UpdateVesselTypeEvent) history); break; case UPDATE_VESSELTYPE_CONFIRMED: case "UPDATE_CONFIRMED": apply((UpdateVesselTypeConfirmedEvent) history); break; case VESSELTYPE_UPDATED: case "UPDATED": apply((VesselTypeUpdatedEvent) history); break; case DELETE_VESSELTYPE: case "DELETE": apply((DeleteVesselTypeEvent) history); break; case DELETE_VESSELTYPE_CONFIRMED: case "DELETE_CONFIRMED": apply((DeleteVesselTypeConfirmedEvent) history); break; case VESSELTYPE_DELETED: case "DELETED": apply((VesselTypeDeletedEvent) history); break; // FAILED case CREATE_VESSELTYPE_FAILED: case UPDATE_VESSELTYPE_FAILED: case DELETE_VESSELTYPE_FAILED: case "CREATE_FAILED": case "UPDATE_FAILED": case "DELETE_FAILED": logger.debug("Evento fallido"); _apply((SimpleEvent) history); break; // CANCELLED case CREATE_VESSELTYPE_CANCELLED: case "CREATE_CANCELLED": apply((CreateVesselTypeCancelledEvent) history); break; case UPDATE_VESSELTYPE_CANCELLED: case DELETE_VESSELTYPE_CANCELLED: case "UPDATE_CANCELLED": case "DELETE_CANCELLED": logger.debug("Compensación por edición/borrado fallido"); _apply((VesselTypeEvent) history); break; Loading Loading @@ -267,13 +267,4 @@ public class VesselTypeAggregate extends Aggregate { this.vesselType = null; super.reset(); } private boolean itemIsLocked(String eventType) { return !(eventType.equals(VesselTypeEventType.VESSELTYPE_CREATED.toString()) || eventType.equals(VesselTypeEventType.VESSELTYPE_UPDATED.toString()) || eventType.equals(VesselTypeEventType.CREATE_VESSELTYPE_CANCELLED.toString()) || eventType.equals(VesselTypeEventType.UPDATE_VESSELTYPE_CANCELLED.toString()) || eventType.equals(VesselTypeEventType.DELETE_VESSELTYPE_CANCELLED.toString())); } }
vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselEventStreams.java +23 −32 Original line number Diff line number Diff line Loading @@ -18,14 +18,14 @@ import es.redmic.commandslib.statestore.StreamConfig; import es.redmic.commandslib.streams.EventStreams; import es.redmic.vesselslib.dto.VesselDTO; import es.redmic.vesselslib.dto.VesselTypeDTO; import es.redmic.vesselslib.events.vessel.VesselEventType; import es.redmic.vesselslib.events.vessel.VesselEventTypes; import es.redmic.vesselslib.events.vessel.common.VesselEvent; import es.redmic.vesselslib.events.vessel.create.VesselCreatedEvent; import es.redmic.vesselslib.events.vessel.delete.DeleteVesselCancelledEvent; import es.redmic.vesselslib.events.vessel.update.UpdateVesselCancelledEvent; import es.redmic.vesselslib.events.vessel.update.UpdateVesselEvent; import es.redmic.vesselslib.events.vessel.update.VesselUpdatedEvent; import es.redmic.vesselslib.events.vesseltype.VesselTypeEventType; import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes; import es.redmic.vesselslib.events.vesseltype.common.VesselTypeEvent; public class VesselEventStreams extends EventStreams { Loading @@ -45,11 +45,11 @@ public class VesselEventStreams extends EventStreams { // Stream filtrado por eventos de confirmación al crear KStream<String, Event> createConfirmedEvents = vesselEvents .filter((id, event) -> (VesselEventType.CREATE_VESSEL_CONFIRMED.toString().equals(event.getType()))); .filter((id, event) -> (VesselEventTypes.CREATE_CONFIRMED.equals(event.getType()))); // Stream filtrado por eventos de petición de crear KStream<String, Event> createRequestEvents = vesselEvents .filter((id, event) -> (VesselEventType.CREATE_VESSEL.toString().equals(event.getType()))); .filter((id, event) -> (VesselEventTypes.CREATE.equals(event.getType()))); // Join por id, mandando a kafka el evento de éxito createConfirmedEvents.join(createRequestEvents, Loading @@ -59,9 +59,9 @@ public class VesselEventStreams extends EventStreams { private Event getCreatedEvent(Event confirmedEvent, Event requestEvent) { assert requestEvent.getType().equals(VesselEventType.CREATE_VESSEL.name()); assert requestEvent.getType().equals(VesselEventTypes.CREATE); assert confirmedEvent.getType().equals(VesselEventType.CREATE_VESSEL_CONFIRMED.name()); assert confirmedEvent.getType().equals(VesselEventTypes.CREATE_CONFIRMED); if (!isSameSession(confirmedEvent, requestEvent)) { return null; Loading @@ -81,11 +81,11 @@ public class VesselEventStreams extends EventStreams { // Stream filtrado por eventos de confirmación al modificar KStream<String, Event> updateConfirmedEvents = vesselEvents .filter((id, event) -> (VesselEventType.UPDATE_VESSEL_CONFIRMED.toString().equals(event.getType()))); .filter((id, event) -> (VesselEventTypes.UPDATE_CONFIRMED.equals(event.getType()))); // Stream filtrado por eventos de petición de modificar KStream<String, Event> updateRequestEvents = vesselEvents .filter((id, event) -> (VesselEventType.UPDATE_VESSEL.toString().equals(event.getType()))); .filter((id, event) -> (VesselEventTypes.UPDATE.equals(event.getType()))); // Join por id, mandando a kafka el evento de éxito updateConfirmedEvents.join(updateRequestEvents, Loading @@ -95,9 +95,9 @@ public class VesselEventStreams extends EventStreams { private Event getUpdatedEvent(Event confirmedEvent, Event requestEvent) { assert requestEvent.getType().equals(VesselEventType.UPDATE_VESSEL.name()); assert requestEvent.getType().equals(VesselEventTypes.UPDATE); assert confirmedEvent.getType().equals(VesselEventType.UPDATE_VESSEL_CONFIRMED.name()); assert confirmedEvent.getType().equals(VesselEventTypes.UPDATE_CONFIRMED); if (!isSameSession(confirmedEvent, requestEvent)) { return null; Loading @@ -118,8 +118,8 @@ public class VesselEventStreams extends EventStreams { // Stream filtrado por eventos de creaciones y modificaciones correctos (solo el // último que se produzca por id) KStream<String, Event> successEvents = vesselEvents .filter((id, event) -> (VesselEventType.VESSEL_CREATED.toString().equals(event.getType()) || VesselEventType.VESSEL_UPDATED.toString().equals(event.getType()))); .filter((id, event) -> (VesselEventTypes.CREATED.equals(event.getType()) || VesselEventTypes.UPDATED.equals(event.getType()))); processUpdateFailedStream(vesselEvents, successEvents); Loading @@ -131,7 +131,7 @@ public class VesselEventStreams extends EventStreams { // Stream filtrado por eventos de fallo al modificar KStream<String, Event> failedEvents = vesselEvents .filter((id, event) -> (VesselEventType.UPDATE_VESSEL_FAILED.toString().equals(event.getType()))); .filter((id, event) -> (VesselEventTypes.UPDATE_FAILED.equals(event.getType()))); KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue); Loading @@ -147,7 +147,7 @@ public class VesselEventStreams extends EventStreams { // Stream filtrado por eventos de fallo al borrar KStream<String, Event> failedEvents = vesselEvents .filter((id, event) -> (VesselEventType.DELETE_VESSEL_FAILED.toString().equals(event.getType()))); .filter((id, event) -> (VesselEventTypes.DELETE_FAILED.equals(event.getType()))); KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue); Loading @@ -160,10 +160,10 @@ public class VesselEventStreams extends EventStreams { private Event getUpdateCancelledEvent(Event failedEvent, Event lastSuccessEvent) { assert lastSuccessEvent.getType().equals(VesselEventType.VESSEL_CREATED.name()) || lastSuccessEvent.getType().equals(VesselEventType.VESSEL_UPDATED.name()); assert lastSuccessEvent.getType().equals(VesselEventTypes.CREATED) || lastSuccessEvent.getType().equals(VesselEventTypes.UPDATED); assert failedEvent.getType().equals(VesselEventType.UPDATE_VESSEL_FAILED.name()); assert failedEvent.getType().equals(VesselEventTypes.UPDATE_FAILED); VesselDTO vessel = ((VesselEvent) lastSuccessEvent).getVessel(); Loading @@ -180,10 +180,10 @@ public class VesselEventStreams extends EventStreams { private Event getDeleteCancelledEvent(Event failedEvent, Event lastSuccessEvent) { assert lastSuccessEvent.getType().equals(VesselEventType.VESSEL_CREATED.name()) || lastSuccessEvent.getType().equals(VesselEventType.VESSEL_UPDATED.name()); assert lastSuccessEvent.getType().equals(VesselEventTypes.CREATED) || lastSuccessEvent.getType().equals(VesselEventTypes.UPDATED); assert failedEvent.getType().equals(VesselEventType.DELETE_VESSEL_FAILED.name()); assert failedEvent.getType().equals(VesselEventTypes.DELETE_FAILED); VesselDTO vessel = ((VesselEvent) lastSuccessEvent).getVessel(); Loading Loading @@ -222,7 +222,7 @@ public class VesselEventStreams extends EventStreams { KStream<String, Event> vesselTypeEvents = builder.stream(vesselTypeTopic); KStream<String, Event> updateReferenceEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventType.VESSELTYPE_UPDATED.toString().equals(event.getType()))); .filter((id, event) -> (VesselTypeEventTypes.UPDATED.equals(event.getType()))); KStream<String, ArrayList<VesselEvent>> join = updateReferenceEvents.join(vesselEventsTable, (updateReferenceEvent, vesselWithReferenceEvents) -> getPostUpdateEvent(updateReferenceEvent, Loading @@ -242,9 +242,9 @@ public class VesselEventStreams extends EventStreams { VesselEvent vesselEvent = entry.getValue(); VesselTypeDTO vesselType = ((VesselTypeEvent) updateReferenceEvent).getVesselType(); if (itemIsLocked(vesselEvent.getType())) { if (VesselEventTypes.isLocked(vesselEvent.getType())) { if (!vesselEvent.getType().equals(VesselEventType.VESSEL_DELETED.toString())) { if (!vesselEvent.getType().equals(VesselEventTypes.DELETED)) { String message = "Item con id " + vesselEvent.getAggregateId() + " se encuentra en mitad de un ciclo de creación o edición, por lo que no se modificó la referencia " + updateReferenceEvent.getAggregateId(); Loading Loading @@ -276,13 +276,4 @@ public class VesselEventStreams extends EventStreams { } return result; } private boolean itemIsLocked(String type) { return !(VesselEventType.VESSEL_CREATED.toString().equals(type) || VesselEventType.VESSEL_UPDATED.toString().equals(type) || VesselEventType.CREATE_VESSEL_CANCELLED.toString().equals(type) || VesselEventType.UPDATE_VESSEL_CANCELLED.toString().equals(type) || VesselEventType.DELETE_VESSEL_CANCELLED.toString().equals(type)); } }
vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselTypeEventStreams.java +23 −23 Original line number Diff line number Diff line Loading @@ -10,7 +10,7 @@ import es.redmic.brokerlib.avro.common.EventError; import es.redmic.commandslib.statestore.StreamConfig; import es.redmic.commandslib.streams.EventStreams; import es.redmic.vesselslib.dto.VesselTypeDTO; import es.redmic.vesselslib.events.vesseltype.VesselTypeEventType; import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes; import es.redmic.vesselslib.events.vesseltype.common.VesselTypeEvent; import es.redmic.vesselslib.events.vesseltype.create.VesselTypeCreatedEvent; import es.redmic.vesselslib.events.vesseltype.delete.DeleteVesselTypeCancelledEvent; Loading @@ -29,12 +29,12 @@ public class VesselTypeEventStreams extends EventStreams { protected void processCreatedStream(KStream<String, Event> vesselTypeEvents) { // Stream filtrado por eventos de confirmación al crear KStream<String, Event> createConfirmedEvents = vesselTypeEvents.filter( (id, event) -> (VesselTypeEventType.CREATE_VESSELTYPE_CONFIRMED.toString().equals(event.getType()))); KStream<String, Event> createConfirmedEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventTypes.CREATE_CONFIRMED.toString().equals(event.getType()))); // Stream filtrado por eventos de petición de crear KStream<String, Event> createRequestEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventType.CREATE_VESSELTYPE.toString().equals(event.getType()))); .filter((id, event) -> (VesselTypeEventTypes.CREATE.toString().equals(event.getType()))); // Join por id, mandando a kafka el evento de éxito createConfirmedEvents.join(createRequestEvents, Loading @@ -44,9 +44,9 @@ public class VesselTypeEventStreams extends EventStreams { private Event getCreatedEvent(Event confirmedEvent, Event requestEvent) { assert requestEvent.getType().equals(VesselTypeEventType.CREATE_VESSELTYPE.name()); assert requestEvent.getType().equals(VesselTypeEventTypes.CREATE); assert confirmedEvent.getType().equals(VesselTypeEventType.CREATE_VESSELTYPE_CONFIRMED.name()); assert confirmedEvent.getType().equals(VesselTypeEventTypes.CREATE_CONFIRMED); if (!isSameSession(confirmedEvent, requestEvent)) { return null; Loading @@ -65,12 +65,12 @@ public class VesselTypeEventStreams extends EventStreams { protected void processUpdatedStream(KStream<String, Event> vesselTypeEvents) { // Stream filtrado por eventos de confirmación al modificar KStream<String, Event> updateConfirmedEvents = vesselTypeEvents.filter( (id, event) -> (VesselTypeEventType.UPDATE_VESSELTYPE_CONFIRMED.toString().equals(event.getType()))); KStream<String, Event> updateConfirmedEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventTypes.UPDATE_CONFIRMED.toString().equals(event.getType()))); // Stream filtrado por eventos de petición de modificar KStream<String, Event> updateRequestEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventType.UPDATE_VESSELTYPE.toString().equals(event.getType()))); .filter((id, event) -> (VesselTypeEventTypes.UPDATE.toString().equals(event.getType()))); // Join por id, mandando a kafka el evento de éxito updateConfirmedEvents.join(updateRequestEvents, Loading @@ -80,9 +80,9 @@ public class VesselTypeEventStreams extends EventStreams { private Event getUpdatedEvent(Event confirmedEvent, Event requestEvent) { assert requestEvent.getType().equals(VesselTypeEventType.UPDATE_VESSELTYPE.name()); assert requestEvent.getType().equals(VesselTypeEventTypes.UPDATE); assert confirmedEvent.getType().equals(VesselTypeEventType.UPDATE_VESSELTYPE_CONFIRMED.name()); assert confirmedEvent.getType().equals(VesselTypeEventTypes.UPDATE_CONFIRMED); if (!isSameSession(confirmedEvent, requestEvent)) { return null; Loading @@ -105,8 +105,8 @@ public class VesselTypeEventStreams extends EventStreams { // Stream filtrado por eventos de creaciones y modificaciones correctos (solo el // último que se produzca por id) KStream<String, Event> successEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventType.VESSELTYPE_CREATED.toString().equals(event.getType()) || VesselTypeEventType.VESSELTYPE_UPDATED.toString().equals(event.getType()))); .filter((id, event) -> (VesselTypeEventTypes.CREATED.toString().equals(event.getType()) || VesselTypeEventTypes.VESSELTYPE_UPDATED.toString().equals(event.getType()))); processUpdateFailedStream(vesselTypeEvents, successEvents); Loading @@ -118,8 +118,8 @@ public class VesselTypeEventStreams extends EventStreams { KStream<String, Event> successEvents) { // Stream filtrado por eventos de fallo al modificar KStream<String, Event> failedEvents = vesselTypeEvents.filter( (id, event) -> (VesselTypeEventType.UPDATE_VESSELTYPE_FAILED.toString().equals(event.getType()))); KStream<String, Event> failedEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventTypes.UPDATE_FAILED.toString().equals(event.getType()))); KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue); Loading @@ -134,8 +134,8 @@ public class VesselTypeEventStreams extends EventStreams { KStream<String, Event> successEvents) { // Stream filtrado por eventos de fallo al borrar KStream<String, Event> failedEvents = vesselTypeEvents.filter( (id, event) -> (VesselTypeEventType.DELETE_VESSELTYPE_FAILED.toString().equals(event.getType()))); KStream<String, Event> failedEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventTypes.DELETE_FAILED.toString().equals(event.getType()))); KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue); Loading @@ -148,10 +148,10 @@ public class VesselTypeEventStreams extends EventStreams { private Event getUpdateCancelledEvent(Event failedEvent, Event lastSuccessEvent) { assert failedEvent.getType().equals(VesselTypeEventType.UPDATE_VESSELTYPE_FAILED.name()); assert failedEvent.getType().equals(VesselTypeEventTypes.UPDATE_FAILED); assert lastSuccessEvent.getType().equals(VesselTypeEventType.VESSELTYPE_CREATED.name()) || lastSuccessEvent.getType().equals(VesselTypeEventType.VESSELTYPE_UPDATED.name()); assert lastSuccessEvent.getType().equals(VesselTypeEventTypes.CREATED) || lastSuccessEvent.getType().equals(VesselTypeEventTypes.VESSELTYPE_UPDATED); VesselTypeDTO vesselType = ((VesselTypeEvent) lastSuccessEvent).getVesselType(); Loading @@ -168,10 +168,10 @@ public class VesselTypeEventStreams extends EventStreams { private Event getDeleteCancelledEvent(Event failedEvent, Event lastSuccessEvent) { assert failedEvent.getType().equals(VesselTypeEventType.DELETE_VESSELTYPE_FAILED.name()); assert failedEvent.getType().equals(VesselTypeEventTypes.DELETE_FAILED); assert lastSuccessEvent.getType().equals(VesselTypeEventType.VESSELTYPE_CREATED.name()) || lastSuccessEvent.getType().equals(VesselTypeEventType.VESSELTYPE_UPDATED.name()); assert lastSuccessEvent.getType().equals(VesselTypeEventTypes.CREATED) || lastSuccessEvent.getType().equals(VesselTypeEventTypes.VESSELTYPE_UPDATED); VesselTypeDTO vesselType = ((VesselTypeEvent) lastSuccessEvent).getVesselType(); Loading