Commit 8b89447a authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade stream de enriquecer layer con themeinspire

parent afbbb4bc
Loading
Loading
Loading
Loading
+44 −5
Original line number Diff line number Diff line
@@ -35,6 +35,7 @@ import es.redmic.atlaslib.events.layer.LayerEventFactory;
import es.redmic.atlaslib.events.layer.LayerEventTypes;
import es.redmic.atlaslib.events.layer.common.LayerEvent;
import es.redmic.atlaslib.events.layer.create.CreateLayerEnrichedEvent;
import es.redmic.atlaslib.events.layer.update.UpdateLayerEnrichedEvent;
import es.redmic.atlaslib.events.themeinspire.ThemeInspireEventTypes;
import es.redmic.atlaslib.events.themeinspire.common.ThemeInspireEvent;
import es.redmic.brokerlib.alert.AlertService;
@@ -143,6 +144,49 @@ public class LayerEventStreams extends EventSourcingStreams {
		return LayerEventFactory.getEvent(confirmedEvent, LayerEventTypes.CREATED, layer);
	}

	/*
	 * Función que a partir de los eventos de tipo UpdateEnrich y globalKTable de
	 * las relaciones, enriquece el item antes de mandarlo a modificar
	 * 
	 */

	@Override
	protected void processEnrichUpdateSteam(KStream<String, Event> events) {

		KStream<String, Event> enrichUpdateEvents = events
				.filter((id, event) -> (EventTypes.ENRICH_UPDATE.equals(event.getType())))
				.selectKey((k, v) -> getThemeInspireIdFromLayer(v));

		enrichUpdateEvents
				.leftJoin(themeInspire, (k, v) -> k, (enrichUpdateEvent,
						themeInspireEvent) -> getEnrichUpdateResultEvent(enrichUpdateEvent, themeInspireEvent))
				.selectKey((k, v) -> v.getAggregateId()).to(topic);
	}

	private Event getEnrichUpdateResultEvent(Event enrichUpdateEvents, Event themeInspireEvent) {

		UpdateLayerEnrichedEvent event = (UpdateLayerEnrichedEvent) LayerEventFactory.getEvent(enrichUpdateEvents,
				LayerEventTypes.UPDATE_ENRICHED, ((LayerEvent) enrichUpdateEvents).getLayer());

		if (themeInspireEvent != null && !themeInspireEvent.getType().equals(ThemeInspireEventTypes.DELETED)) {
			((LayerEvent) event).getLayer().setThemeInspire(((ThemeInspireEvent) themeInspireEvent).getThemeInspire());
		} else {

			String themeInspireId = getThemeInspireIdFromLayer(enrichUpdateEvents);

			logger.warn(
					"Intentando enriquecer " + enrichUpdateEvents.getAggregateId() + " con un elemento que no existe");

			Map<String, String> arguments = new HashMap<>();
			arguments.put("themeInspire.id", themeInspireId);

			return LayerEventFactory.getEvent(enrichUpdateEvents, LayerEventTypes.UPDATE_FAILED,
					ExceptionType.ITEM_NOT_FOUND.toString(), arguments);
		}

		return event;
	}

	/**
	 * Función que apartir del evento de confirmación de la vista y del evento
	 * update (petición de modificación), si todo es correcto, genera evento updated
@@ -222,11 +266,6 @@ public class LayerEventStreams extends EventSourcingStreams {
				eventError.getExceptionType(), eventError.getArguments());
	}

	@Override
	protected void processEnrichUpdateSteam(KStream<String, Event> events) {
		// En este caso no hay enriquecimiento
	}

	@Override
	protected void processPartialUpdatedStream(KStream<String, Event> layerEvents,
			KStream<String, Event> updateConfirmedEvents) {