Commit c24fc644 authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade manejo de eventos de refrecado de capas

parent 3faf46d2
Loading
Loading
Loading
Loading
+32 −1
Original line number Diff line number Diff line
@@ -2,6 +2,7 @@ package es.redmic.atlascommands.aggregate;

import es.redmic.atlascommands.commands.layer.CreateLayerCommand;
import es.redmic.atlascommands.commands.layer.DeleteLayerCommand;
import es.redmic.atlascommands.commands.layer.RefreshLayerCommand;
import es.redmic.atlascommands.commands.layer.UpdateLayerCommand;
import es.redmic.atlascommands.statestore.LayerStateStore;

@@ -28,10 +29,12 @@ import es.redmic.atlascommands.statestore.LayerStateStore;
import es.redmic.atlaslib.dto.layer.LayerDTO;
import es.redmic.atlaslib.events.layer.LayerEventTypes;
import es.redmic.atlaslib.events.layer.common.LayerEvent;
import es.redmic.atlaslib.events.layer.common.LayerRefreshEvent;
import es.redmic.atlaslib.events.layer.create.CreateLayerCancelledEvent;
import es.redmic.atlaslib.events.layer.create.CreateLayerEvent;
import es.redmic.atlaslib.events.layer.delete.CheckDeleteLayerEvent;
import es.redmic.atlaslib.events.layer.delete.LayerDeletedEvent;
import es.redmic.atlaslib.events.layer.refresh.RefreshLayerEvent;
import es.redmic.atlaslib.events.layer.update.UpdateLayerEvent;
import es.redmic.brokerlib.avro.common.Event;
import es.redmic.commandslib.aggregate.Aggregate;
@@ -103,6 +106,25 @@ public class LayerAggregate extends Aggregate {
		return evt;
	}

	public RefreshLayerEvent process(RefreshLayerCommand cmd) {

		assert layerStateStore != null;

		String id = cmd.getLayerId();

		Event state = getStateFromHistory(id);

		loadFromHistory(state);

		checkState(id, state.getType());

		RefreshLayerEvent evt = new RefreshLayerEvent(cmd.getLayer());
		evt.setAggregateId(id);
		evt.setVersion(getVersion() + 1);

		return evt;
	}

	public LayerDTO getLayer() {
		return layer;
	}
@@ -141,6 +163,10 @@ public class LayerAggregate extends Aggregate {
			logger.debug("Item borrado");
			apply((LayerDeletedEvent) event);
			break;
		case "REFRESHED":
			logger.debug("Item refrescado");
			apply((LayerEvent) event);
			break;
		// CANCELLED
		case "CREATE_CANCELLED":
			logger.debug("Compensación por creación fallida");
@@ -148,7 +174,8 @@ public class LayerAggregate extends Aggregate {
			break;
		case "UPDATE_CANCELLED":
		case "DELETE_CANCELLED":
			logger.debug("Compensación por edición/borrado fallido");
		case "REFRESH_CANCELLED":
			logger.debug("Compensación por edición/borrado/refresco fallido");
			apply((LayerEvent) event);
			break;
		default:
@@ -171,6 +198,10 @@ public class LayerAggregate extends Aggregate {
		super.apply(event);
	}

	public void apply(LayerRefreshEvent event) {
		super.apply(event);
	}

	@Override
	protected void reset() {
		this.layer = null;
+51 −0
Original line number Diff line number Diff line
@@ -33,6 +33,7 @@ import org.springframework.stereotype.Component;
import es.redmic.atlascommands.aggregate.LayerAggregate;
import es.redmic.atlascommands.commands.layer.CreateLayerCommand;
import es.redmic.atlascommands.commands.layer.DeleteLayerCommand;
import es.redmic.atlascommands.commands.layer.RefreshLayerCommand;
import es.redmic.atlascommands.commands.layer.UpdateLayerCommand;
import es.redmic.atlascommands.config.UserService;
import es.redmic.atlascommands.statestore.LayerStateStore;
@@ -50,6 +51,9 @@ import es.redmic.atlaslib.events.layer.delete.DeleteLayerCheckFailedEvent;
import es.redmic.atlaslib.events.layer.delete.DeleteLayerCheckedEvent;
import es.redmic.atlaslib.events.layer.delete.DeleteLayerConfirmedEvent;
import es.redmic.atlaslib.events.layer.delete.LayerDeletedEvent;
import es.redmic.atlaslib.events.layer.refresh.LayerRefreshedEvent;
import es.redmic.atlaslib.events.layer.refresh.RefreshLayerCancelledEvent;
import es.redmic.atlaslib.events.layer.refresh.RefreshLayerEvent;
import es.redmic.atlaslib.events.layer.update.LayerUpdatedEvent;
import es.redmic.atlaslib.events.layer.update.UpdateLayerCancelledEvent;
import es.redmic.atlaslib.events.layer.update.UpdateLayerEvent;
@@ -204,6 +208,32 @@ public class LayerCommandHandler extends CommandHandler {
		return getResult(event.getSessionId(), completableFuture);
	}

	public LayerDTO refresh(RefreshLayerCommand cmd) {

		LayerAggregate agg = new LayerAggregate(layerStateStore);

		// Se procesa el comando, obteniendo el evento generado
		RefreshLayerEvent event = agg.process(cmd);

		// Si no se genera evento significa que no se va a aplicar
		if (event == null)
			return null;

		event.setUserId(userService.getUserId());

		// Si no existen excepciones, se aplica el comando
		agg.apply(event);

		// Crea la espera hasta que se responda con evento completado
		CompletableFuture<LayerDTO> completableFuture = getCompletableFeature(event.getSessionId(), agg.getLayer());

		// Emite evento para enviar a kafka
		publishToKafka(event, layerTopic);

		// Obtiene el resultado cuando se resuelva la espera
		return getResult(event.getSessionId(), completableFuture);
	}

	@KafkaHandler
	private void listen(LayerCreatedEvent event) {

@@ -224,6 +254,16 @@ public class LayerCommandHandler extends CommandHandler {
		resolveCommand(event.getSessionId());
	}

	@KafkaHandler
	private void listen(LayerRefreshedEvent event) {

		logger.debug("Layer refrescado " + event.getAggregateId());

		// El evento Refrescado se envía desde el stream

		resolveCommand(event.getSessionId());
	}

	@KafkaHandler
	private void listen(DeleteLayerCheckedEvent event) {

@@ -271,6 +311,17 @@ public class LayerCommandHandler extends CommandHandler {
				ExceptionFactory.getException(event.getExceptionType(), event.getArguments()));
	}

	@KafkaHandler
	private void listen(RefreshLayerCancelledEvent event) {

		logger.debug("Error refrescando Layer " + event.getAggregateId());

		// El evento Cancelled se envía desde el stream

		resolveCommand(event.getSessionId(),
				ExceptionFactory.getException(event.getExceptionType(), event.getArguments()));
	}

	@KafkaHandler
	private void listen(DeleteLayerCheckFailedEvent event) {