Commit 7e54d0b1 authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade streams para select/deselect y clear

parent ef8b39e8
Loading
Loading
Loading
Loading
+418 −0
Original line number Diff line number Diff line
package es.redmic.commandslib.usersettings.streams;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;

/*-
 * #%L
 * commands-lib
 * %%
 * Copyright (C) 2019 REDMIC Project / Server
 * %%
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * #L%
 */

import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.mapstruct.factory.Mappers;

import es.redmic.brokerlib.alert.AlertService;
import es.redmic.brokerlib.avro.common.Event;
import es.redmic.brokerlib.avro.common.EventError;
import es.redmic.commandslib.streaming.common.BaseStreams;
import es.redmic.commandslib.streaming.common.StreamConfig;
import es.redmic.commandslib.streaming.common.StreamUtils;
import es.redmic.exception.common.ExceptionType;
import es.redmic.usersettingslib.dto.SelectionDTO;
import es.redmic.usersettingslib.dto.SettingsDTO;
import es.redmic.usersettingslib.events.SettingsEventFactory;
import es.redmic.usersettingslib.events.SettingsEventTypes;
import es.redmic.usersettingslib.events.common.SelectionEvent;
import es.redmic.usersettingslib.events.common.SettingsEvent;
import es.redmic.usersettingslib.mapper.SettingsMapper;

public class SettingsEventStreams extends BaseStreams {

	protected StreamsBuilder builder = new StreamsBuilder();

	protected String snapshotTopicSuffix = "-snapshot";

	protected String snapshotTopic;

	public SettingsEventStreams(StreamConfig config, AlertService alertService) {
		super(config, alertService);
		snapshotTopic = topic + snapshotTopicSuffix;
		init();
	}

	@Override
	protected KafkaStreams processStreams() {

		KStream<String, Event> events = builder.stream(topic);

		KStream<String, Event> snapshotEvents = builder.stream(snapshotTopic);

		// Table filtrado por eventos finales (Siempre el último evento)
		KTable<String, Event> snapshotKTable = snapshotEvents.groupByKey().reduce((aggValue, newValue) -> newValue);

		// Reenvia eventos snapshot al topic correspondiente
		forwardSnapshotEvents(events);

		processPartialSelect(events, snapshotKTable);

		processSelectConfirm(events);

		processSelectFailedStream(events, snapshotKTable);

		processPartialDeselect(events, snapshotKTable);

		processDeselectConfirm(events);

		processDeselectFailedStream(events, snapshotKTable);

		processPartialClearSelection(events, snapshotKTable);

		processClearSelectionConfirm(events);

		processClearSelectionFailedStream(events, snapshotKTable);

		return new KafkaStreams(builder.build(),
				StreamUtils.baseStreamsConfig(bootstrapServers, stateStoreDir, serviceId, schemaRegistry));
	}

	private void forwardSnapshotEvents(KStream<String, Event> events) {

		events.filter((id, event) -> (SettingsEventTypes.isSnapshot(event.getType()))).to(snapshotTopic);
	}

	// Select

	private void processPartialSelect(KStream<String, Event> events, KTable<String, Event> snapshotEvents) {

		// Stream filtrado por eventos parciales de selección
		KStream<String, Event> partialSelectEvents = events
				.filter((id, event) -> (SettingsEventTypes.PARTIAL_SELECT.equals(event.getType())));

		// Join por id, mandando a kafka el evento de select
		partialSelectEvents
				.leftJoin(snapshotEvents,
						(partialSelectEvent, snapshotEvent) -> getSelectEvent(partialSelectEvent, snapshotEvent))
				.filter((k, v) -> (v != null)).to(topic);
	}

	private Event getSelectEvent(Event partialSelectEvent, Event snapshotEvent) {

		assert partialSelectEvent.getType().equals(SettingsEventTypes.PARTIAL_SELECT);

		assert (snapshotEvent == null || SettingsEventTypes.isSnapshot(snapshotEvent.getType()));

		SelectionDTO newSelection = ((SelectionEvent) partialSelectEvent).getSelection();

		if (snapshotEvent == null) {
			return SettingsEventFactory.getEvent(partialSelectEvent, SettingsEventTypes.SELECT,
					Mappers.getMapper(SettingsMapper.class).map(newSelection));
		} else {

			SettingsDTO settings = ((SettingsEvent) snapshotEvent).getSettings();

			if (!changeSelectionIsGranted(settings, newSelection)) {

				return SettingsEventFactory.getEvent(partialSelectEvent, SettingsEventTypes.SELECT_FAILED,
						ExceptionType.ES_SELECTION_WORK.toString(), null);
			}

			settings.getSelection().addAll(newSelection.getSelection());

			return SettingsEventFactory.getEvent(partialSelectEvent, SettingsEventTypes.SELECT, settings);
		}
	}

	private void processSelectFailedStream(KStream<String, Event> events, KTable<String, Event> snapshotKTable) {

		// Stream filtrado por eventos de fallo al seleccionar
		KStream<String, Event> failedEvents = events
				.filter((id, event) -> (SettingsEventTypes.SELECT_FAILED.equals(event.getType())));

		// Join por id, mandando a kafka el evento de compensación
		failedEvents.leftJoin(snapshotKTable,
				(failedEvent, snapshotEvent) -> getSelectCancelledEvent(failedEvent, snapshotEvent)).to(topic);
	}

	private Event getSelectCancelledEvent(Event failedEvent, Event snapshotEvent) {

		assert failedEvent.getType().equals(SettingsEventTypes.SELECT_FAILED);

		assert (snapshotEvent == null || SettingsEventTypes.isSnapshot(snapshotEvent.getType()));

		EventError eventError = (EventError) failedEvent;

		SettingsDTO settings = null;

		// Si falló una selección que no existía (equivalente a borrarla)
		if (snapshotEvent != null) {
			settings = ((SettingsEvent) snapshotEvent).getSettings();
		}

		return SettingsEventFactory.getEvent(failedEvent, SettingsEventTypes.SELECT_CANCELLED, settings,
				eventError.getExceptionType(), eventError.getArguments());
	}

	private void processSelectConfirm(KStream<String, Event> events) {

		// Stream filtrado por eventos de confirmación al seleccionar
		KStream<String, Event> selectConfirmedEvents = events
				.filter((id, event) -> (SettingsEventTypes.SELECT_CONFIRMED.equals(event.getType())));

		// Table filtrado por eventos de petición de selección (Siempre el último
		// evento)
		KTable<String, Event> selectRequestEvents = events
				.filter((id, event) -> (SettingsEventTypes.SELECT.equals(event.getType()))).groupByKey()
				.reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de éxito
		selectConfirmedEvents
				.join(selectRequestEvents,
						(confirmedEvent, requestEvent) -> getSelectedEvent(confirmedEvent, requestEvent))
				.filter((k, v) -> (v != null)).to(topic);
	}

	private Event getSelectedEvent(Event confirmedEvent, Event requestEvent) {

		assert confirmedEvent.getType().equals(SettingsEventTypes.SELECT_CONFIRMED);

		assert requestEvent.getType().equals(SettingsEventTypes.SELECT);

		if (!isSameSession(confirmedEvent, requestEvent)) {
			return null;
		}

		SettingsDTO settings = ((SettingsEvent) requestEvent).getSettings();

		return SettingsEventFactory.getEvent(requestEvent, SettingsEventTypes.SELECTED, settings);
	}

	// Deselect

	private void processPartialDeselect(KStream<String, Event> events, KTable<String, Event> snapshotEvents) {

		// Stream filtrado por eventos parciales de deselección
		KStream<String, Event> partialDeselectEvents = events
				.filter((id, event) -> (SettingsEventTypes.PARTIAL_DESELECT.equals(event.getType())));

		// Join por id, mandando a kafka el evento de select
		partialDeselectEvents
				.join(snapshotEvents,
						(partialDeselectEvent, snapshotEvent) -> getDeselectEvent(partialDeselectEvent, snapshotEvent))
				.filter((k, v) -> (v != null)).to(topic);
	}

	private Event getDeselectEvent(Event partialDeselectEvent, Event snapshotEvent) {

		assert partialDeselectEvent.getType().equals(SettingsEventTypes.PARTIAL_DESELECT);

		assert SettingsEventTypes.isSnapshot(snapshotEvent.getType());

		SelectionDTO newSelection = ((SelectionEvent) partialDeselectEvent).getSelection();

		SettingsDTO settings = ((SettingsEvent) snapshotEvent).getSettings();

		if (!changeSelectionIsGranted(settings, newSelection)) {

			return SettingsEventFactory.getEvent(partialDeselectEvent, SettingsEventTypes.DESELECT_FAILED,
					ExceptionType.ES_SELECTION_WORK.toString(), null);
		}

		settings.getSelection().removeAll(newSelection.getSelection());

		return SettingsEventFactory.getEvent(partialDeselectEvent, SettingsEventTypes.DESELECT, settings);
	}

	private void processDeselectConfirm(KStream<String, Event> events) {

		// Stream filtrado por eventos de confirmación al deseleccionar
		KStream<String, Event> deselectConfirmedEvents = events
				.filter((id, event) -> (SettingsEventTypes.DESELECT_CONFIRMED.equals(event.getType())));

		// Table filtrado por eventos de petición de deselección (Siempre el último
		// evento)
		KTable<String, Event> deselectRequestEvents = events
				.filter((id, event) -> (SettingsEventTypes.DESELECT.equals(event.getType()))).groupByKey()
				.reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de éxito
		deselectConfirmedEvents
				.join(deselectRequestEvents,
						(confirmedEvent, requestEvent) -> getDeselectedEvent(confirmedEvent, requestEvent))
				.filter((k, v) -> (v != null)).to(topic);
	}

	private Event getDeselectedEvent(Event confirmedEvent, Event requestEvent) {

		assert confirmedEvent.getType().equals(SettingsEventTypes.DESELECT_CONFIRMED);

		assert requestEvent.getType().equals(SettingsEventTypes.DESELECT);

		if (!isSameSession(confirmedEvent, requestEvent)) {
			return null;
		}

		SettingsDTO settings = ((SettingsEvent) requestEvent).getSettings();

		return SettingsEventFactory.getEvent(requestEvent, SettingsEventTypes.DESELECTED, settings);
	}

	private void processDeselectFailedStream(KStream<String, Event> events, KTable<String, Event> snapshotKTable) {

		// Stream filtrado por eventos de fallo al deseleccionar
		KStream<String, Event> failedEvents = events
				.filter((id, event) -> (SettingsEventTypes.DESELECT_FAILED.equals(event.getType())));

		// Join por id, mandando a kafka el evento de compensación
		failedEvents
				.join(snapshotKTable,
						(failedEvent, snapshotEvent) -> getDeselectCancelledEvent(failedEvent, snapshotEvent))
				.to(topic);

	}

	private Event getDeselectCancelledEvent(Event failedEvent, Event snapshotEvent) {

		assert failedEvent.getType().equals(SettingsEventTypes.DESELECT_FAILED);

		assert SettingsEventTypes.isSnapshot(snapshotEvent.getType());

		EventError eventError = (EventError) failedEvent;

		SettingsDTO settings = ((SettingsEvent) snapshotEvent).getSettings();

		return SettingsEventFactory.getEvent(failedEvent, SettingsEventTypes.DESELECT_CANCELLED, settings,
				eventError.getExceptionType(), eventError.getArguments());
	}

	// Clear

	private void processPartialClearSelection(KStream<String, Event> events, KTable<String, Event> snapshotEvents) {

		// Stream filtrado por eventos parciales de clear
		KStream<String, Event> partialClearEvents = events
				.filter((id, event) -> (SettingsEventTypes.PARTIAL_CLEAR_SELECTION.equals(event.getType())));

		// Join por id, mandando a kafka el evento de select
		partialClearEvents
				.join(snapshotEvents,
						(partialClearEvent, snapshotEvent) -> getClearEvent(partialClearEvent, snapshotEvent))
				.filter((k, v) -> (v != null)).to(topic);
	}

	private Event getClearEvent(Event partialClearEvent, Event snapshotEvent) {

		assert partialClearEvent.getType().equals(SettingsEventTypes.PARTIAL_CLEAR_SELECTION);

		assert SettingsEventTypes.isSnapshot(snapshotEvent.getType());

		SelectionDTO newSelection = ((SelectionEvent) partialClearEvent).getSelection();

		SettingsDTO settings = ((SettingsEvent) snapshotEvent).getSettings();

		if (!changeSelectionIsGranted(settings, newSelection)) {

			return SettingsEventFactory.getEvent(partialClearEvent, SettingsEventTypes.CLEAR_SELECTION_FAILED,
					ExceptionType.ES_SELECTION_WORK.toString(), null);
		}

		settings.getSelection().clear();

		return SettingsEventFactory.getEvent(partialClearEvent, SettingsEventTypes.CLEAR_SELECTION, settings);
	}

	private void processClearSelectionConfirm(KStream<String, Event> events) {

		// Stream filtrado por eventos de confirmación al limpiar selección
		KStream<String, Event> clearSelectionConfirmedEvents = events
				.filter((id, event) -> (SettingsEventTypes.CLEAR_SELECTION_CONFIRMED.equals(event.getType())));

		// Table filtrado por eventos de petición de limpiar selección (Siempre el
		// último evento)
		KTable<String, Event> clearSelectionRequestEvents = events
				.filter((id, event) -> (SettingsEventTypes.CLEAR_SELECTION.equals(event.getType()))).groupByKey()
				.reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de éxito
		clearSelectionConfirmedEvents
				.join(clearSelectionRequestEvents,
						(confirmedEvent, requestEvent) -> getSelectionClearedEvent(confirmedEvent, requestEvent))
				.filter((k, v) -> (v != null)).to(topic);
	}

	private Event getSelectionClearedEvent(Event confirmedEvent, Event requestEvent) {

		assert confirmedEvent.getType().equals(SettingsEventTypes.CLEAR_SELECTION_CONFIRMED);

		assert requestEvent.getType().equals(SettingsEventTypes.CLEAR_SELECTION);

		if (!isSameSession(confirmedEvent, requestEvent)) {
			return null;
		}

		SettingsDTO settings = ((SettingsEvent) requestEvent).getSettings();

		return SettingsEventFactory.getEvent(requestEvent, SettingsEventTypes.SELECTION_CLEARED, settings);
	}

	private void processClearSelectionFailedStream(KStream<String, Event> events,
			KTable<String, Event> snapshotKTable) {

		// Stream filtrado por eventos de fallo al limpiar selección
		KStream<String, Event> failedEvents = events
				.filter((id, event) -> (SettingsEventTypes.CLEAR_SELECTION_FAILED.equals(event.getType())));

		// Join por id, mandando a kafka el evento de compensación
		failedEvents
				.join(snapshotKTable,
						(failedEvent, snapshotEvent) -> getClearSelectionCancelledEvent(failedEvent, snapshotEvent))
				.to(topic);
	}

	private Event getClearSelectionCancelledEvent(Event failedEvent, Event snapshotEvent) {

		assert failedEvent.getType().equals(SettingsEventTypes.CLEAR_SELECTION_FAILED);

		assert SettingsEventTypes.isSnapshot(snapshotEvent.getType());

		EventError eventError = (EventError) failedEvent;

		SettingsDTO settings = ((SettingsEvent) snapshotEvent).getSettings();

		return SettingsEventFactory.getEvent(failedEvent, SettingsEventTypes.CLEAR_SELECTION_CANCELLED, settings,
				eventError.getExceptionType(), eventError.getArguments());
	}

	private boolean changeSelectionIsGranted(SettingsDTO settings, SelectionDTO newSelection) {

		if (!(settings.getService().equals(newSelection.getService())
				&& settings.getUserId().equals(newSelection.getUserId()) && !settings.getShared()
				&& (settings.getName() == null))) {

			logger.error(
					"Imposible modificar la selección de trabajo. No cumple alguna de las restricciones establecidas.");

			return false;
		}
		return true;
	}

	@Override
	protected void postProcessStreams() {
	}
}