Commit 12388aef authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade stream para generar eventos de refresco

De esta forma se generan los eventos de refrescado y refresco cancelado
a partir de otros eventos
parent c24fc644
Loading
Loading
Loading
Loading
+85 −0
Original line number Diff line number Diff line
@@ -23,6 +23,7 @@ import java.util.Arrays;
 */

import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

import es.redmic.atlaslib.dto.layer.LayerDTO;
import es.redmic.atlaslib.events.layer.LayerEventFactory;
@@ -186,5 +187,89 @@ public class LayerEventStreams extends EventSourcingStreams {

	@Override
	protected void processExtraStreams(KStream<String, Event> events, KStream<String, Event> snapshotEvents) {

		processRefreshSuccessStream(events);

		processRefreshFailedStream(events, snapshotEvents);
	}

	protected void processRefreshSuccessStream(KStream<String, Event> events) {

		// Stream filtrado por eventos de confirmación al refrescar
		KStream<String, Event> refreshConfirmedEvents = events
				.filter((id, event) -> (LayerEventTypes.REFRESH_CONFIRMED.equals(event.getType())));

		// Table filtrado por eventos de petición de refrescar (Siempre el último
		// evento)
		KTable<String, Event> refreshRequestEvents = events
				.filter((id, event) -> (LayerEventTypes.REFRESH.equals(event.getType()))).groupByKey()
				.reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de éxito
		refreshConfirmedEvents
				.join(refreshRequestEvents,
						(confirmedEvent, requestEvent) -> geRefreshedEvent(confirmedEvent, requestEvent))
				.filter((k, v) -> (v != null)).to(topic);
	}

	/**
	 * Función que apartir del evento de confirmación de la vista y del evento
	 * refresh (petición de modificación), si todo es correcto, genera evento
	 * updated
	 */

	protected Event geRefreshedEvent(Event confirmedEvent, Event requestEvent) {

		assert requestEvent.getType().equals(LayerEventTypes.REFRESH);

		assert confirmedEvent.getType().equals(LayerEventTypes.REFRESH_CONFIRMED);

		if (!isSameSession(confirmedEvent, requestEvent)) {
			return null;
		}

		// Se obtiene la capa de la vista que es la que contiene todos los datos
		LayerDTO layer = ((LayerEvent) confirmedEvent).getLayer();

		return LayerEventFactory.getEvent(requestEvent, LayerEventTypes.REFRESHED, layer);
	}

	/**
	 * Función que a partir del último evento correcto y el evento fallido al
	 * refrescar, envía evento de cancelación
	 */

	protected void processRefreshFailedStream(KStream<String, Event> events, KStream<String, Event> successEvents) {

		// Stream filtrado por eventos de fallo al modificar
		KStream<String, Event> failedEvents = events
				.filter((id, event) -> (LayerEventTypes.REFRESH_FAILED.equals(event.getType())));

		KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de compensación
		failedEvents
				.join(successEventsTable,
						(failedEvent, lastSuccessEvent) -> getRefreshCancelledEvent(failedEvent, lastSuccessEvent))
				.to(topic);
	}

	/**
	 * Función que a partir del evento fallido y el último evento correcto, genera
	 * evento RefreshCancelled
	 */

	protected Event getRefreshCancelledEvent(Event failedEvent, Event lastSuccessEvent) {

		assert failedEvent.getType().equals(LayerEventTypes.REFRESH_FAILED);

		assert LayerEventTypes.isSnapshot(lastSuccessEvent.getType());

		LayerDTO layer = ((LayerEvent) lastSuccessEvent).getLayer();

		EventError eventError = (EventError) failedEvent;

		return LayerEventFactory.getEvent(failedEvent, LayerEventTypes.REFRESH_CANCELLED, layer,
				eventError.getExceptionType(), eventError.getArguments());
	}
}