Commit 70606a65 authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade stream de enriquecer layer con themeinspire

parent d4299af6
Loading
Loading
Loading
Loading
+70 −6
Original line number Diff line number Diff line
package es.redmic.atlascommands.streams;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.streams.kstream.GlobalKTable;

/*-
 * #%L
@@ -26,19 +30,30 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

import es.redmic.atlaslib.dto.layer.LayerDTO;
import es.redmic.atlaslib.dto.themeinspire.ThemeInspireDTO;
import es.redmic.atlaslib.events.layer.LayerEventFactory;
import es.redmic.atlaslib.events.layer.LayerEventTypes;
import es.redmic.atlaslib.events.layer.common.LayerEvent;
import es.redmic.atlaslib.events.layer.create.CreateLayerEnrichedEvent;
import es.redmic.atlaslib.events.themeinspire.ThemeInspireEventTypes;
import es.redmic.atlaslib.events.themeinspire.common.ThemeInspireEvent;
import es.redmic.brokerlib.alert.AlertService;
import es.redmic.brokerlib.avro.common.Event;
import es.redmic.brokerlib.avro.common.EventError;
import es.redmic.brokerlib.avro.common.EventTypes;
import es.redmic.commandslib.streaming.common.StreamConfig;
import es.redmic.commandslib.streaming.streams.EventSourcingStreams;
import es.redmic.exception.common.ExceptionType;

public class LayerEventStreams extends EventSourcingStreams {

	public LayerEventStreams(StreamConfig config, AlertService alertService) {
	private String themeInspireTopic;

	private GlobalKTable<String, Event> themeInspire;

	public LayerEventStreams(StreamConfig config, String themeInspireTopic, AlertService alertService) {
		super(config, alertService);
		this.themeInspireTopic = themeInspireTopic + snapshotTopicSuffix;
		init();
	}

@@ -49,6 +64,8 @@ public class LayerEventStreams extends EventSourcingStreams {

	@Override
	protected void createExtraStreams() {

		themeInspire = builder.globalTable(themeInspireTopic);
	}

	/**
@@ -60,6 +77,51 @@ public class LayerEventStreams extends EventSourcingStreams {
		events.filter((id, event) -> (LayerEventTypes.isSnapshot(event.getType()))).to(snapshotTopic);
	}

	/**
	 * Función que a partir de los eventos de tipo CreateEnrich y globalKTable de
	 * las relaciones, enriquece el item antes de mandarlo a crear
	 * 
	 */

	@Override
	protected void processEnrichCreateSteam(KStream<String, Event> events) {

		KStream<String, Event> enrichCreateEvents = events
				.filter((id,
						event) -> (EventTypes.ENRICH_CREATE.equals(event.getType())
								&& getThemeInspireIdFromLayer(event) != null))
				.selectKey((k, v) -> getThemeInspireIdFromLayer(v));

		enrichCreateEvents.leftJoin(themeInspire, (k, v) -> k,
				(enrichCreateEvent, vesselTypeEvent) -> getEnrichCreateResultEvent(enrichCreateEvent, vesselTypeEvent))
				.selectKey((k, v) -> v.getAggregateId()).to(topic);
	}

	private Event getEnrichCreateResultEvent(Event enrichCreateEvents, Event themeInspireEvent) {

		if (themeInspireEvent != null && !themeInspireEvent.getType().equals(ThemeInspireEventTypes.DELETED)) {

			CreateLayerEnrichedEvent event = (CreateLayerEnrichedEvent) LayerEventFactory.getEvent(enrichCreateEvents,
					LayerEventTypes.CREATE_ENRICHED, ((LayerEvent) enrichCreateEvents).getLayer());

			((LayerEvent) event).getLayer().setThemeInspire(((ThemeInspireEvent) themeInspireEvent).getThemeInspire());

			return event;
		} else {

			String themeInspireId = getThemeInspireIdFromLayer(enrichCreateEvents);

			logger.warn("Intentando enriquecer " + enrichCreateEvents.getAggregateId()
					+ " con un elemento que no existe: themeInspire.id = " + themeInspireId);

			Map<String, String> arguments = new HashMap<>();
			arguments.put("themeInspire.id", themeInspireId);

			return LayerEventFactory.getEvent(enrichCreateEvents, LayerEventTypes.CREATE_FAILED,
					ExceptionType.ITEM_NOT_FOUND.toString(), arguments);
		}
	}

	/**
	 * Función que apartir del evento de confirmación de la vista y del evento
	 * create (petición de creación), si todo es correcto, genera evento created
@@ -160,11 +222,6 @@ public class LayerEventStreams extends EventSourcingStreams {
				eventError.getExceptionType(), eventError.getArguments());
	}

	@Override
	protected void processEnrichCreateSteam(KStream<String, Event> events) {
		// En este caso no hay enriquecimiento
	}

	@Override
	protected void processEnrichUpdateSteam(KStream<String, Event> events) {
		// En este caso no hay enriquecimiento
@@ -272,4 +329,11 @@ public class LayerEventStreams extends EventSourcingStreams {
		return LayerEventFactory.getEvent(failedEvent, LayerEventTypes.REFRESH_CANCELLED, layer,
				eventError.getExceptionType(), eventError.getArguments());
	}

	private String getThemeInspireIdFromLayer(Event evt) {

		ThemeInspireDTO themeInspireDTO = ((LayerEvent) evt).getLayer().getThemeInspire();

		return themeInspireDTO != null ? themeInspireDTO.getId() : null;
	}
}