Commit 512840ae authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade componentes para realizar postupdate

Cuando se recibe un evento de relación modificada, se genera un evento
de edición parcial por cada elemento que contenga esta relación.

Además se añade stream para crear evento updated al recibir la
confirmación de edición parcial por parte de la vista
parent 10bd919b
Loading
Loading
Loading
Loading
+23 −0
Original line number Diff line number Diff line
@@ -24,6 +24,8 @@ import java.util.concurrent.CompletableFuture;

import javax.annotation.PostConstruct;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaHandler;
@@ -58,7 +60,9 @@ import es.redmic.atlaslib.events.layer.refresh.RefreshLayerEvent;
import es.redmic.atlaslib.events.layer.update.LayerUpdatedEvent;
import es.redmic.atlaslib.events.layer.update.UpdateLayerCancelledEvent;
import es.redmic.atlaslib.events.layer.update.UpdateLayerEnrichedEvent;
import es.redmic.atlaslib.events.themeinspire.update.ThemeInspireUpdatedEvent;
import es.redmic.brokerlib.alert.AlertService;
import es.redmic.brokerlib.avro.common.Event;
import es.redmic.commandslib.commands.CommandHandler;
import es.redmic.commandslib.streaming.common.StreamConfig;
import es.redmic.commandslib.streaming.common.StreamConfig.Builder;
@@ -355,4 +359,23 @@ public class LayerCommandHandler extends CommandHandler {
		resolveCommand(event.getSessionId(),
				ExceptionFactory.getException(event.getExceptionType(), event.getArguments()));
	}

	@KafkaListener(topics = "${broker.topic.theme.inspire.updated}")
	private void listen(ThemeInspireUpdatedEvent event) {

		KeyValueIterator<String, Event> iteratble = layerStateStore.getAll();
		while (iteratble.hasNext()) {
			final KeyValue<String, Event> next = iteratble.next();

			Event layerEvent = next.value;

			if (LayerEventTypes.isSnapshot(layerEvent.getType()) && ((LayerEvent) layerEvent).getLayer()
					.getThemeInspire().getId().equals(event.getThemeInspire().getId())) {

				publishToKafka(LayerEventFactory.getEvent(layerEvent, event, LayerEventTypes.UPDATE_THEMEINSPIRE),
						layerTopic);
			}
		}
		iteratble.close();
	}
}
+54 −3
Original line number Diff line number Diff line
@@ -35,6 +35,7 @@ import es.redmic.atlaslib.events.layer.LayerEventFactory;
import es.redmic.atlaslib.events.layer.LayerEventTypes;
import es.redmic.atlaslib.events.layer.common.LayerEvent;
import es.redmic.atlaslib.events.layer.create.CreateLayerEnrichedEvent;
import es.redmic.atlaslib.events.layer.partialupdate.themeinspire.UpdateThemeInspireInLayerEvent;
import es.redmic.atlaslib.events.layer.update.UpdateLayerEnrichedEvent;
import es.redmic.atlaslib.events.themeinspire.ThemeInspireEventTypes;
import es.redmic.atlaslib.events.themeinspire.common.ThemeInspireEvent;
@@ -93,8 +94,9 @@ public class LayerEventStreams extends EventSourcingStreams {
								&& getThemeInspireIdFromLayer(event) != null))
				.selectKey((k, v) -> getThemeInspireIdFromLayer(v));

		enrichCreateEvents.leftJoin(themeInspire, (k, v) -> k,
				(enrichCreateEvent, vesselTypeEvent) -> getEnrichCreateResultEvent(enrichCreateEvent, vesselTypeEvent))
		enrichCreateEvents
				.leftJoin(themeInspire, (k, v) -> k, (enrichCreateEvent,
						themeInspireEvent) -> getEnrichCreateResultEvent(enrichCreateEvent, themeInspireEvent))
				.selectKey((k, v) -> v.getAggregateId()).to(topic);
	}

@@ -266,10 +268,59 @@ public class LayerEventStreams extends EventSourcingStreams {
				eventError.getExceptionType(), eventError.getArguments());
	}

	/**
	 * Función que a partir del último evento correcto + el evento de edición
	 * parcial + la confirmación de la vista, envía evento modificado.
	 */
	@Override
	protected void processPartialUpdatedStream(KStream<String, Event> layerEvents,
			KStream<String, Event> updateConfirmedEvents) {
		// En este caso no hay modificaciones parciales

		// Table filtrado por eventos de petición de modificar themeInspire (Siempre el
		// último evento)
		KTable<String, Event> updateRequestEvents = layerEvents
				.filter((id, event) -> (LayerEventTypes.UPDATE_THEMEINSPIRE.equals(event.getType()))).groupByKey()
				.reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de éxito
		KStream<String, UpdateThemeInspireInLayerEvent> partialUpdateEvent = updateConfirmedEvents.join(
				updateRequestEvents,
				(confirmedEvent, requestEvent) -> isSameSession(confirmedEvent, requestEvent)
						? (UpdateThemeInspireInLayerEvent) requestEvent
						: null);

		// Stream filtrado por eventos de creaciones y modificaciones correctos (solo el
		// último que se produzca por id)
		KStream<String, Event> successEvents = layerEvents
				.filter((id, event) -> (LayerEventTypes.isSnapshot(event.getType())
						&& !LayerEventTypes.DELETED.equals(event.getType())));

		KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de confirmación
		partialUpdateEvent.join(successEventsTable, (partialUpdateConfirmEvent,
				lastSuccessEvent) -> getUpdatedEventFromPartialUpdate(partialUpdateConfirmEvent, lastSuccessEvent))
				.filter((k, v) -> (v != null)).to(topic);
	}

	/**
	 * Función que a partir del último evento correcto + el evento de edición
	 * parcial + la confirmación de la vista, si todo es correcto, genera evento
	 * updated
	 */

	private Event getUpdatedEventFromPartialUpdate(UpdateThemeInspireInLayerEvent partialUpdateConfirmEvent,
			Event lastSuccessEvent) {

		assert (LayerEventTypes.isSnapshot(lastSuccessEvent.getType())
				&& !lastSuccessEvent.getType().equals(LayerEventTypes.DELETED));

		assert partialUpdateConfirmEvent.getType().equals(LayerEventTypes.UPDATE_THEMEINSPIRE);

		LayerDTO layer = ((LayerEvent) lastSuccessEvent).getLayer();
		layer.setThemeInspire(partialUpdateConfirmEvent.getThemeInspire());

		return LayerEventFactory.getEvent(partialUpdateConfirmEvent, LayerEventTypes.UPDATED, layer);
	}

	@Override