Commit abdee9a0 authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade métodos para manejar elementos bloqueados

De esta manera si un elemento quedó bloqueado o se produce un timeout,
se puede revertir
parent 7c2d1437
Loading
Loading
Loading
Loading
+22 −3
Original line number Diff line number Diff line
@@ -27,10 +27,12 @@ import org.apache.logging.log4j.Logger;

import es.redmic.brokerlib.avro.common.Event;
import es.redmic.brokerlib.avro.common.EventTypes;
import es.redmic.brokerlib.avro.fail.RollbackEvent;
import es.redmic.commandslib.exceptions.HistoryNotFoundException;
import es.redmic.commandslib.exceptions.ItemLockedException;
import es.redmic.exception.data.ItemNotFoundException;
import es.redmic.exception.settings.SettingsChangeForbiddenException;
import es.redmic.usersettingslib.events.SettingsEventTypes;

public abstract class Aggregate {

@@ -91,9 +93,6 @@ public abstract class Aggregate {

		if (isLocked(event.getType()) && !event.getType().equals(EventTypes.DELETED)) {

			// TODO: Si el tiempo entre el último y el actual es superior a el máximo del
			// ciclo, compensar.

			logger.error("Intentando modificar un elemento bloqueado por una edición en curso, ",
					event.getAggregateId());
			throw new ItemLockedException("id", event.getAggregateId());
@@ -164,4 +163,24 @@ public abstract class Aggregate {
		if (historicalEventUserId != null && !userId.equals(historicalEventUserId))
			throw new SettingsChangeForbiddenException();
	}

	public Event getRollbackEventFromBlockedEvent(String id, long timeoutMS) {

		Event blockedEvent = getItemFromStateStore(id);

		if (!SettingsEventTypes.isSnapshot(blockedEvent.getType())
				&& blockedEventRequiresRollback(blockedEvent, timeoutMS))
			return getRollbackEvent(blockedEvent);
		return null;
	}

	private boolean blockedEventRequiresRollback(Event event, long timeoutMS) {

		return event.getDate().plus(timeoutMS).isAfterNow();
	}

	public Event getRollbackEvent(Event sourceEvent) {

		return new RollbackEvent().buildFrom(sourceEvent);
	}
}
+36 −16
Original line number Diff line number Diff line
@@ -36,6 +36,7 @@ import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.kafka.annotation.KafkaHandler;

import es.redmic.brokerlib.avro.common.Event;
import es.redmic.commandslib.aggregate.Aggregate;
import es.redmic.commandslib.exceptions.ConfirmationTimeoutException;
import es.redmic.commandslib.gateway.BrokerEvent;
import es.redmic.exception.common.BaseException;
@@ -43,7 +44,7 @@ import es.redmic.exception.common.BaseException;
public abstract class CommandHandler implements ApplicationEventPublisherAware {

	@Value("${rest.eventsource.timeout.ms}")
	private long timeoutMS;
	protected long timeoutMS;

	protected static Logger logger = LogManager.getLogger();

@@ -87,12 +88,38 @@ public abstract class CommandHandler implements ApplicationEventPublisherAware {
		});
	}

	protected void unlockStatus(Aggregate agg, String id, String topic) {

		Event rollbackEvent = agg.getRollbackEventFromBlockedEvent(id, timeoutMS);

		if (rollbackEvent != null)
			publishToKafka(rollbackEvent, topic);
	}

	protected <T> T sendEventAndWaitResult(Aggregate agg, Event event, String topic) {

		// Crea la espera hasta que se responda con evento completado
		CompletableFuture<T> completableFuture = getCompletableFeature(event.getSessionId());

		// Emite evento para enviar a kafka
		publishToKafka(event, topic);

		// Obtiene el resultado cuando se resuelva la espera
		try {
			return getResult(event.getSessionId(), completableFuture);
		} catch (ConfirmationTimeoutException e) {
			publishToKafka(agg.getRollbackEvent(event), topic);
			throw e;
		}
	}

	// Crea un completableFuture para esperar por el evento de confirmación o error.
	protected <T> CompletableFuture<T> getCompletableFeature(String sessionId) {

		// Añade espera para resolver la petición
		CompletableFuture<Object> future = new CompletableFuture<Object>();
		completableFeatures.put(sessionId, future);

		// Cuando se resuelve la espera, se resuelve con el dto
		return future.thenApplyAsync(obj -> apply(obj));
	}
@@ -120,28 +147,21 @@ public abstract class CommandHandler implements ApplicationEventPublisherAware {
		try {
			return completableFuture.get(timeoutMS, TimeUnit.MILLISECONDS);
		} catch (InterruptedException | TimeoutException e) {
			// TODO: Enviar alerta ya que ha quedado un evento sin acabar el ciclo
			e.printStackTrace();
			logger.error("Error. No se ha recibido confirmación de la acción realizada.");

			throw new ConfirmationTimeoutException();
		} catch (ExecutionException e) {
			e.printStackTrace();

			if (e.getCause() instanceof BaseException)
				throw ((BaseException) e.getCause()); // Error enviado desde la vista
			else

			logger.error("Error. Excepción no controlada en la ejecución.");

			throw new ConfirmationTimeoutException();
		} finally {
			completableFeatures.remove(sessionId);
		}
	}

	protected <T> T sendEventAndWaitResult(Event event, String topic) {

		// Crea la espera hasta que se responda con evento completado
		CompletableFuture<T> completableFuture = getCompletableFeature(event.getSessionId());

		// Emite evento para enviar a kafka
		publishToKafka(event, topic);

		// Obtiene el resultado cuando se resuelva la espera
		return getResult(event.getSessionId(), completableFuture);
	}
}