Loading pom.xml +75 −35 Original line number Diff line number Diff line <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> Loading @@ -12,23 +13,53 @@ <groupId>es.redmic.lib</groupId> <artifactId>commands-lib</artifactId> <packaging>jar</packaging> <version>0.9.0</version> <version>0.10.0</version> <name>commands-lib</name> <properties> <!-- REDMIC --> <redmic.models.version>0.8.0</redmic.models.version> <redmic.broker-lib.version>0.8.0</redmic.broker-lib.version> <redmic.rest-lib.version>0.8.0</redmic.rest-lib.version> <redmic.models.version>0.11.0</redmic.models.version> <redmic.broker-lib.version>0.11.0</redmic.broker-lib.version> <redmic.rest-lib.version>0.11.0</redmic.rest-lib.version> <redmic.user-settings-lib.version>0.2.0</redmic.user-settings-lib.version> <redmic.test-utils.version>0.10.0</redmic.test-utils.version> <!-- OTHERS --> <kafka.version>2.0.1</kafka.version> <confluent.version>5.0.1</confluent.version> <powermock-tests-utils.version>1.6.6</powermock-tests-utils.version> <commons-io.version>1.3.2</commons-io.version> <mapstruct.version>1.3.0.Final</mapstruct.version> </properties> <dependencies> <!-- Redmic --> <dependency> <groupId>es.redmic.lib</groupId> <artifactId>models</artifactId> <version>${redmic.models.version}</version> </dependency> <dependency> <groupId>es.redmic.lib</groupId> <artifactId>broker-lib</artifactId> <version>${redmic.broker-lib.version}</version> </dependency> <dependency> <groupId>es.redmic.lib</groupId> <artifactId>rest-lib</artifactId> <version>${redmic.rest-lib.version}</version> </dependency> <dependency> <groupId>es.redmic.lib</groupId> <artifactId>user-settings-lib</artifactId> <version>${redmic.user-settings-lib.version}</version> </dependency> <!-- OTHERS --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> Loading Loading @@ -68,25 +99,10 @@ </exclusions> </dependency> <!-- Redmic --> <dependency> <groupId>es.redmic.lib</groupId> <artifactId>models</artifactId> <version>${redmic.models.version}</version> </dependency> <dependency> <groupId>es.redmic.lib</groupId> <artifactId>broker-lib</artifactId> <version>${redmic.broker-lib.version}</version> </dependency> <dependency> <groupId>es.redmic.lib</groupId> <artifactId>rest-lib</artifactId> <version>${redmic.rest-lib.version}</version> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <scope>provided</scope> </dependency> <dependency> Loading @@ -95,21 +111,45 @@ <version>${commons-io.version}</version> </dependency> <dependency> <groupId>org.mapstruct</groupId> <artifactId>mapstruct</artifactId> <version>${mapstruct.version}</version> </dependency> <!-- Test --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <groupId>es.redmic.lib</groupId> <artifactId>test-utils</artifactId> <version>${redmic.test-utils.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.powermock.tests</groupId> <artifactId>powermock-tests-utils</artifactId> <version>${powermock-tests-utils.version}</version> <groupId>es.redmic.lib</groupId> <artifactId>user-settings-lib</artifactId> <version>${redmic.user-settings-lib.version}</version> <type>test-jar</type> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <executions> <execution> <id>Jar Tests Package</id> <phase>package</phase> <goals> <goal>test-jar</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <repositories> <repository> <id>central</id> Loading src/main/java/es/redmic/commandslib/aggregate/Aggregate.java +42 −3 Original line number Diff line number Diff line Loading @@ -27,9 +27,12 @@ import org.apache.logging.log4j.Logger; import es.redmic.brokerlib.avro.common.Event; import es.redmic.brokerlib.avro.common.EventTypes; import es.redmic.brokerlib.avro.fail.PrepareRollbackEvent; import es.redmic.brokerlib.avro.fail.RollbackFailedEvent; import es.redmic.commandslib.exceptions.HistoryNotFoundException; import es.redmic.commandslib.exceptions.ItemLockedException; import es.redmic.exception.data.ItemNotFoundException; import es.redmic.exception.settings.SettingsChangeForbiddenException; public abstract class Aggregate { Loading Loading @@ -90,11 +93,9 @@ public abstract class Aggregate { if (isLocked(event.getType()) && !event.getType().equals(EventTypes.DELETED)) { // TODO: Si el tiempo entre el último y el actual es superior a el máximo del // ciclo, compensar. logger.error("Intentando modificar un elemento bloqueado por una edición en curso, ", event.getAggregateId()); logger.error("Item bloqueado por un evento de tipo: " + event.getType()); throw new ItemLockedException("id", event.getAggregateId()); } } Loading Loading @@ -157,4 +158,42 @@ public abstract class Aggregate { public boolean isDeleted() { return deleted; } protected void authorshipCheck(String userId, String historicalEventUserId) { if (historicalEventUserId != null && !userId.equals(historicalEventUserId)) throw new SettingsChangeForbiddenException(); } public Event getRollbackEventFromBlockedEvent(String id, long timeoutMS) { Event blockedEvent = getItemFromStateStore(id); if (blockedEvent != null && blockedEventRequiresRollback(blockedEvent, timeoutMS)) return getRollbackEvent(blockedEvent); return null; } private boolean blockedEventRequiresRollback(Event event, long timeoutMS) { return event.getDate().plus(timeoutMS).isAfterNow(); } public Event getRollbackEvent(Event sourceEvent) { logger.error( "Un error en el sistema ha dejado un evento en estado bloqueado. Generando evento rollback para evento bloqueado de tipo " + sourceEvent.getType() + ". ItemId : " + sourceEvent.getAggregateId()); String failEventType = null; if (sourceEvent instanceof RollbackFailedEvent) failEventType = ((RollbackFailedEvent) sourceEvent).getFailEventType(); else failEventType = sourceEvent.getType(); PrepareRollbackEvent event = new PrepareRollbackEvent().buildFrom(sourceEvent); event.setFailEventType(failEventType); return event; } } src/main/java/es/redmic/commandslib/commands/CommandHandler.java +67 −15 Original line number Diff line number Diff line Loading @@ -30,12 +30,16 @@ import java.util.concurrent.TimeoutException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.kafka.annotation.KafkaHandler; import es.redmic.brokerlib.alert.AlertService; import es.redmic.brokerlib.avro.common.Event; import es.redmic.brokerlib.avro.fail.RollbackFailedEvent; import es.redmic.commandslib.aggregate.Aggregate; import es.redmic.commandslib.exceptions.ConfirmationTimeoutException; import es.redmic.commandslib.gateway.BrokerEvent; import es.redmic.exception.common.BaseException; Loading @@ -43,13 +47,23 @@ import es.redmic.exception.common.BaseException; public abstract class CommandHandler implements ApplicationEventPublisherAware { @Value("${rest.eventsource.timeout.ms}") private long timeoutMS; protected long timeoutMS; protected static Logger logger = LogManager.getLogger(); protected ApplicationEventPublisher eventPublisher; protected Map<String, CompletableFuture<BaseException>> completableFeatures = new HashMap<>(); protected Map<String, CompletableFuture<Object>> completableFeatures = new HashMap<>(); @Autowired AlertService alertService; @KafkaHandler private void listen(RollbackFailedEvent event) { alertService.errorAlert("Rollback fallido " + event.getAggregateId(), "Rollback de evento " + event.getFailEventType() + " con id " + event.getAggregateId() + " ha fallado."); } @Override public void setApplicationEventPublisher(ApplicationEventPublisher eventPublisher) { Loading @@ -71,15 +85,15 @@ public abstract class CommandHandler implements ApplicationEventPublisherAware { resolveCommand(sessionId, null); } protected void resolveCommand(String sessionId, BaseException ex) { protected void resolveCommand(String sessionId, Object result) { // Si el evento es una excepción se resuelve con ella, si no, con null que // significa que todo fue bien Executors.newCachedThreadPool().submit(() -> { CompletableFuture<BaseException> future = completableFeatures.get(sessionId); CompletableFuture<Object> future = completableFeatures.get(sessionId); if (future != null) { future.complete(ex);// future.complete(ex); future.complete(result); } else { logger.warn("Petición asíncrona no resgistrada para sessionId: " + sessionId); } Loading @@ -87,24 +101,57 @@ public abstract class CommandHandler implements ApplicationEventPublisherAware { }); } protected void unlockStatus(Aggregate agg, String id, String topic) { Event rollbackEvent = agg.getRollbackEventFromBlockedEvent(id, timeoutMS); if (rollbackEvent != null) { alertService.errorAlert(rollbackEvent.getType() + " rollback", "Enviando rollback de evento " + rollbackEvent.getType() + " con id " + rollbackEvent.getAggregateId()); publishToKafka(rollbackEvent, topic); } } protected <T> T sendEventAndWaitResult(Aggregate agg, Event event, String topic) { // Crea la espera hasta que se responda con evento completado CompletableFuture<T> completableFuture = getCompletableFeature(event.getSessionId()); // Emite evento para enviar a kafka publishToKafka(event, topic); // Obtiene el resultado cuando se resuelva la espera try { return getResult(event.getSessionId(), completableFuture); } catch (ConfirmationTimeoutException e) { e.printStackTrace(); alertService.errorAlert(event.getType() + " rollback", "Enviando rollback de evento " + event.getType() + " con id " + event.getAggregateId() + " " + e.getLocalizedMessage()); publishToKafka(agg.getRollbackEvent(event), topic); throw e; } } // Crea un completableFuture para esperar por el evento de confirmación o error. protected <T> CompletableFuture<T> getCompletableFeature(String sessionId, T item) { protected <T> CompletableFuture<T> getCompletableFeature(String sessionId) { // Añade espera para resolver la petición CompletableFuture<BaseException> future = new CompletableFuture<BaseException>(); CompletableFuture<Object> future = new CompletableFuture<Object>(); completableFeatures.put(sessionId, future); // Cuando se resuelve la espera, se resuelve con el dto return future.thenApplyAsync(ex -> apply(ex, item)); return future.thenApplyAsync(obj -> apply(obj)); } private <T> T apply(BaseException ex, T item) { @SuppressWarnings("unchecked") private <T> T apply(Object result) { if (ex == null) { if (!(result instanceof BaseException)) { logger.debug("Resolver con éxito"); return item; return (T) result; } else { logger.debug("Error. Lanzar excepción."); throw ex; throw (BaseException) result; } } Loading @@ -119,13 +166,18 @@ public abstract class CommandHandler implements ApplicationEventPublisherAware { try { return completableFuture.get(timeoutMS, TimeUnit.MILLISECONDS); } catch (InterruptedException | TimeoutException e) { // TODO: Enviar alerta ya que ha quedado un evento sin acabar el ciclo e.printStackTrace(); logger.error("Error. No se ha recibido confirmación de la acción realizada."); throw new ConfirmationTimeoutException(); } catch (ExecutionException e) { e.printStackTrace(); if (e.getCause() instanceof BaseException) throw ((BaseException) e.getCause()); // Error enviado desde la vista else logger.error("Error. Excepción no controlada en la ejecución."); throw new ConfirmationTimeoutException(); } finally { completableFeatures.remove(sessionId); Loading src/main/java/es/redmic/commandslib/config/GenerateJsonSchemaScanBean.java +3 −1 Original line number Diff line number Diff line Loading @@ -34,6 +34,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.AbstractEnvironment; import org.springframework.core.env.Environment; import org.springframework.core.env.MapPropertySource; Loading @@ -48,6 +49,7 @@ import com.kjetland.jackson.jsonSchema.JsonSchemaResources; import es.redmic.commandslib.controller.CommandBaseController; import es.redmic.exception.mediastorage.MSFileUploadException; @Configuration public class GenerateJsonSchemaScanBean implements ApplicationContextAware { @Autowired Loading Loading @@ -151,7 +153,7 @@ public class GenerateJsonSchemaScanBean implements ApplicationContextAware { if (properties.isEmpty()) { String serverPath = env.getProperty("server.servlet.context-path") + env.getProperty("spring.mvc.servlet.path"); + env.getProperty("microservice.view.path"); for (Iterator it = ((AbstractEnvironment) env).getPropertySources().iterator(); it.hasNext();) { PropertySource propertySource = (PropertySource) it.next(); if (propertySource instanceof MapPropertySource) { Loading src/main/java/es/redmic/commandslib/streaming/common/BaseStreams.java +15 −0 Original line number Diff line number Diff line Loading @@ -27,6 +27,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import es.redmic.brokerlib.alert.AlertService; import es.redmic.brokerlib.avro.common.Event; public abstract class BaseStreams { Loading Loading @@ -119,4 +120,18 @@ public abstract class BaseStreams { } } } protected boolean isSameSession(Event a, Event b) { if (!(a.getSessionId().equals(b.getSessionId()))) { String message = "Evento de petición " + b.getType() + " con id de sesión " + b.getSessionId() + ", el cual es diferente al evento de confirmación " + a.getType() + " con id de sesión " + a.getSessionId() + " para item " + b.getAggregateId() + "|" + b.getDate() + " (" + a.getAggregateId() + "|" + a.getDate() + ")"; logger.error(message); alertService.errorAlert(a.getAggregateId(), message); return false; } return true; } } Loading
pom.xml +75 −35 Original line number Diff line number Diff line <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> Loading @@ -12,23 +13,53 @@ <groupId>es.redmic.lib</groupId> <artifactId>commands-lib</artifactId> <packaging>jar</packaging> <version>0.9.0</version> <version>0.10.0</version> <name>commands-lib</name> <properties> <!-- REDMIC --> <redmic.models.version>0.8.0</redmic.models.version> <redmic.broker-lib.version>0.8.0</redmic.broker-lib.version> <redmic.rest-lib.version>0.8.0</redmic.rest-lib.version> <redmic.models.version>0.11.0</redmic.models.version> <redmic.broker-lib.version>0.11.0</redmic.broker-lib.version> <redmic.rest-lib.version>0.11.0</redmic.rest-lib.version> <redmic.user-settings-lib.version>0.2.0</redmic.user-settings-lib.version> <redmic.test-utils.version>0.10.0</redmic.test-utils.version> <!-- OTHERS --> <kafka.version>2.0.1</kafka.version> <confluent.version>5.0.1</confluent.version> <powermock-tests-utils.version>1.6.6</powermock-tests-utils.version> <commons-io.version>1.3.2</commons-io.version> <mapstruct.version>1.3.0.Final</mapstruct.version> </properties> <dependencies> <!-- Redmic --> <dependency> <groupId>es.redmic.lib</groupId> <artifactId>models</artifactId> <version>${redmic.models.version}</version> </dependency> <dependency> <groupId>es.redmic.lib</groupId> <artifactId>broker-lib</artifactId> <version>${redmic.broker-lib.version}</version> </dependency> <dependency> <groupId>es.redmic.lib</groupId> <artifactId>rest-lib</artifactId> <version>${redmic.rest-lib.version}</version> </dependency> <dependency> <groupId>es.redmic.lib</groupId> <artifactId>user-settings-lib</artifactId> <version>${redmic.user-settings-lib.version}</version> </dependency> <!-- OTHERS --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> Loading Loading @@ -68,25 +99,10 @@ </exclusions> </dependency> <!-- Redmic --> <dependency> <groupId>es.redmic.lib</groupId> <artifactId>models</artifactId> <version>${redmic.models.version}</version> </dependency> <dependency> <groupId>es.redmic.lib</groupId> <artifactId>broker-lib</artifactId> <version>${redmic.broker-lib.version}</version> </dependency> <dependency> <groupId>es.redmic.lib</groupId> <artifactId>rest-lib</artifactId> <version>${redmic.rest-lib.version}</version> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <scope>provided</scope> </dependency> <dependency> Loading @@ -95,21 +111,45 @@ <version>${commons-io.version}</version> </dependency> <dependency> <groupId>org.mapstruct</groupId> <artifactId>mapstruct</artifactId> <version>${mapstruct.version}</version> </dependency> <!-- Test --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <groupId>es.redmic.lib</groupId> <artifactId>test-utils</artifactId> <version>${redmic.test-utils.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.powermock.tests</groupId> <artifactId>powermock-tests-utils</artifactId> <version>${powermock-tests-utils.version}</version> <groupId>es.redmic.lib</groupId> <artifactId>user-settings-lib</artifactId> <version>${redmic.user-settings-lib.version}</version> <type>test-jar</type> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <executions> <execution> <id>Jar Tests Package</id> <phase>package</phase> <goals> <goal>test-jar</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <repositories> <repository> <id>central</id> Loading
src/main/java/es/redmic/commandslib/aggregate/Aggregate.java +42 −3 Original line number Diff line number Diff line Loading @@ -27,9 +27,12 @@ import org.apache.logging.log4j.Logger; import es.redmic.brokerlib.avro.common.Event; import es.redmic.brokerlib.avro.common.EventTypes; import es.redmic.brokerlib.avro.fail.PrepareRollbackEvent; import es.redmic.brokerlib.avro.fail.RollbackFailedEvent; import es.redmic.commandslib.exceptions.HistoryNotFoundException; import es.redmic.commandslib.exceptions.ItemLockedException; import es.redmic.exception.data.ItemNotFoundException; import es.redmic.exception.settings.SettingsChangeForbiddenException; public abstract class Aggregate { Loading Loading @@ -90,11 +93,9 @@ public abstract class Aggregate { if (isLocked(event.getType()) && !event.getType().equals(EventTypes.DELETED)) { // TODO: Si el tiempo entre el último y el actual es superior a el máximo del // ciclo, compensar. logger.error("Intentando modificar un elemento bloqueado por una edición en curso, ", event.getAggregateId()); logger.error("Item bloqueado por un evento de tipo: " + event.getType()); throw new ItemLockedException("id", event.getAggregateId()); } } Loading Loading @@ -157,4 +158,42 @@ public abstract class Aggregate { public boolean isDeleted() { return deleted; } protected void authorshipCheck(String userId, String historicalEventUserId) { if (historicalEventUserId != null && !userId.equals(historicalEventUserId)) throw new SettingsChangeForbiddenException(); } public Event getRollbackEventFromBlockedEvent(String id, long timeoutMS) { Event blockedEvent = getItemFromStateStore(id); if (blockedEvent != null && blockedEventRequiresRollback(blockedEvent, timeoutMS)) return getRollbackEvent(blockedEvent); return null; } private boolean blockedEventRequiresRollback(Event event, long timeoutMS) { return event.getDate().plus(timeoutMS).isAfterNow(); } public Event getRollbackEvent(Event sourceEvent) { logger.error( "Un error en el sistema ha dejado un evento en estado bloqueado. Generando evento rollback para evento bloqueado de tipo " + sourceEvent.getType() + ". ItemId : " + sourceEvent.getAggregateId()); String failEventType = null; if (sourceEvent instanceof RollbackFailedEvent) failEventType = ((RollbackFailedEvent) sourceEvent).getFailEventType(); else failEventType = sourceEvent.getType(); PrepareRollbackEvent event = new PrepareRollbackEvent().buildFrom(sourceEvent); event.setFailEventType(failEventType); return event; } }
src/main/java/es/redmic/commandslib/commands/CommandHandler.java +67 −15 Original line number Diff line number Diff line Loading @@ -30,12 +30,16 @@ import java.util.concurrent.TimeoutException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.kafka.annotation.KafkaHandler; import es.redmic.brokerlib.alert.AlertService; import es.redmic.brokerlib.avro.common.Event; import es.redmic.brokerlib.avro.fail.RollbackFailedEvent; import es.redmic.commandslib.aggregate.Aggregate; import es.redmic.commandslib.exceptions.ConfirmationTimeoutException; import es.redmic.commandslib.gateway.BrokerEvent; import es.redmic.exception.common.BaseException; Loading @@ -43,13 +47,23 @@ import es.redmic.exception.common.BaseException; public abstract class CommandHandler implements ApplicationEventPublisherAware { @Value("${rest.eventsource.timeout.ms}") private long timeoutMS; protected long timeoutMS; protected static Logger logger = LogManager.getLogger(); protected ApplicationEventPublisher eventPublisher; protected Map<String, CompletableFuture<BaseException>> completableFeatures = new HashMap<>(); protected Map<String, CompletableFuture<Object>> completableFeatures = new HashMap<>(); @Autowired AlertService alertService; @KafkaHandler private void listen(RollbackFailedEvent event) { alertService.errorAlert("Rollback fallido " + event.getAggregateId(), "Rollback de evento " + event.getFailEventType() + " con id " + event.getAggregateId() + " ha fallado."); } @Override public void setApplicationEventPublisher(ApplicationEventPublisher eventPublisher) { Loading @@ -71,15 +85,15 @@ public abstract class CommandHandler implements ApplicationEventPublisherAware { resolveCommand(sessionId, null); } protected void resolveCommand(String sessionId, BaseException ex) { protected void resolveCommand(String sessionId, Object result) { // Si el evento es una excepción se resuelve con ella, si no, con null que // significa que todo fue bien Executors.newCachedThreadPool().submit(() -> { CompletableFuture<BaseException> future = completableFeatures.get(sessionId); CompletableFuture<Object> future = completableFeatures.get(sessionId); if (future != null) { future.complete(ex);// future.complete(ex); future.complete(result); } else { logger.warn("Petición asíncrona no resgistrada para sessionId: " + sessionId); } Loading @@ -87,24 +101,57 @@ public abstract class CommandHandler implements ApplicationEventPublisherAware { }); } protected void unlockStatus(Aggregate agg, String id, String topic) { Event rollbackEvent = agg.getRollbackEventFromBlockedEvent(id, timeoutMS); if (rollbackEvent != null) { alertService.errorAlert(rollbackEvent.getType() + " rollback", "Enviando rollback de evento " + rollbackEvent.getType() + " con id " + rollbackEvent.getAggregateId()); publishToKafka(rollbackEvent, topic); } } protected <T> T sendEventAndWaitResult(Aggregate agg, Event event, String topic) { // Crea la espera hasta que se responda con evento completado CompletableFuture<T> completableFuture = getCompletableFeature(event.getSessionId()); // Emite evento para enviar a kafka publishToKafka(event, topic); // Obtiene el resultado cuando se resuelva la espera try { return getResult(event.getSessionId(), completableFuture); } catch (ConfirmationTimeoutException e) { e.printStackTrace(); alertService.errorAlert(event.getType() + " rollback", "Enviando rollback de evento " + event.getType() + " con id " + event.getAggregateId() + " " + e.getLocalizedMessage()); publishToKafka(agg.getRollbackEvent(event), topic); throw e; } } // Crea un completableFuture para esperar por el evento de confirmación o error. protected <T> CompletableFuture<T> getCompletableFeature(String sessionId, T item) { protected <T> CompletableFuture<T> getCompletableFeature(String sessionId) { // Añade espera para resolver la petición CompletableFuture<BaseException> future = new CompletableFuture<BaseException>(); CompletableFuture<Object> future = new CompletableFuture<Object>(); completableFeatures.put(sessionId, future); // Cuando se resuelve la espera, se resuelve con el dto return future.thenApplyAsync(ex -> apply(ex, item)); return future.thenApplyAsync(obj -> apply(obj)); } private <T> T apply(BaseException ex, T item) { @SuppressWarnings("unchecked") private <T> T apply(Object result) { if (ex == null) { if (!(result instanceof BaseException)) { logger.debug("Resolver con éxito"); return item; return (T) result; } else { logger.debug("Error. Lanzar excepción."); throw ex; throw (BaseException) result; } } Loading @@ -119,13 +166,18 @@ public abstract class CommandHandler implements ApplicationEventPublisherAware { try { return completableFuture.get(timeoutMS, TimeUnit.MILLISECONDS); } catch (InterruptedException | TimeoutException e) { // TODO: Enviar alerta ya que ha quedado un evento sin acabar el ciclo e.printStackTrace(); logger.error("Error. No se ha recibido confirmación de la acción realizada."); throw new ConfirmationTimeoutException(); } catch (ExecutionException e) { e.printStackTrace(); if (e.getCause() instanceof BaseException) throw ((BaseException) e.getCause()); // Error enviado desde la vista else logger.error("Error. Excepción no controlada en la ejecución."); throw new ConfirmationTimeoutException(); } finally { completableFeatures.remove(sessionId); Loading
src/main/java/es/redmic/commandslib/config/GenerateJsonSchemaScanBean.java +3 −1 Original line number Diff line number Diff line Loading @@ -34,6 +34,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.AbstractEnvironment; import org.springframework.core.env.Environment; import org.springframework.core.env.MapPropertySource; Loading @@ -48,6 +49,7 @@ import com.kjetland.jackson.jsonSchema.JsonSchemaResources; import es.redmic.commandslib.controller.CommandBaseController; import es.redmic.exception.mediastorage.MSFileUploadException; @Configuration public class GenerateJsonSchemaScanBean implements ApplicationContextAware { @Autowired Loading Loading @@ -151,7 +153,7 @@ public class GenerateJsonSchemaScanBean implements ApplicationContextAware { if (properties.isEmpty()) { String serverPath = env.getProperty("server.servlet.context-path") + env.getProperty("spring.mvc.servlet.path"); + env.getProperty("microservice.view.path"); for (Iterator it = ((AbstractEnvironment) env).getPropertySources().iterator(); it.hasNext();) { PropertySource propertySource = (PropertySource) it.next(); if (propertySource instanceof MapPropertySource) { Loading
src/main/java/es/redmic/commandslib/streaming/common/BaseStreams.java +15 −0 Original line number Diff line number Diff line Loading @@ -27,6 +27,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import es.redmic.brokerlib.alert.AlertService; import es.redmic.brokerlib.avro.common.Event; public abstract class BaseStreams { Loading Loading @@ -119,4 +120,18 @@ public abstract class BaseStreams { } } } protected boolean isSameSession(Event a, Event b) { if (!(a.getSessionId().equals(b.getSessionId()))) { String message = "Evento de petición " + b.getType() + " con id de sesión " + b.getSessionId() + ", el cual es diferente al evento de confirmación " + a.getType() + " con id de sesión " + a.getSessionId() + " para item " + b.getAggregateId() + "|" + b.getDate() + " (" + a.getAggregateId() + "|" + a.getDate() + ")"; logger.error(message); alertService.errorAlert(a.getAggregateId(), message); return false; } return true; } }