Commit 89563477 authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade streams para procesar guardado y borrado

parent 8e97263d
Loading
Loading
Loading
Loading
+137 −1
Original line number Diff line number Diff line
@@ -34,12 +34,15 @@ import es.redmic.commandslib.streaming.common.BaseStreams;
import es.redmic.commandslib.streaming.common.StreamConfig;
import es.redmic.commandslib.streaming.common.StreamUtils;
import es.redmic.exception.common.ExceptionType;
import es.redmic.usersettingslib.dto.PersistenceDTO;
import es.redmic.usersettingslib.dto.SelectionDTO;
import es.redmic.usersettingslib.dto.SettingsDTO;
import es.redmic.usersettingslib.events.SettingsEventFactory;
import es.redmic.usersettingslib.events.SettingsEventTypes;
import es.redmic.usersettingslib.events.common.SelectionEvent;
import es.redmic.usersettingslib.events.common.SettingsEvent;
import es.redmic.usersettingslib.events.delete.CheckDeleteSettingsEvent;
import es.redmic.usersettingslib.events.save.PartialSaveSettingsEvent;
import es.redmic.usersettingslib.mapper.SettingsMapper;

public class SettingsEventStreams extends BaseStreams {
@@ -87,6 +90,14 @@ public class SettingsEventStreams extends BaseStreams {

		processClearSelectionFailedStream(events, snapshotKTable);

		processPartialSaveSettings(events, snapshotKTable);

		processSaveSettingsConfirm(events);

		processSaveSettingsFailedStream(events, snapshotKTable);

		processCheckDeleteSettings(events, snapshotKTable);

		return new KafkaStreams(builder.build(),
				StreamUtils.baseStreamsConfig(bootstrapServers, stateStoreDir, serviceId, schemaRegistry));
	}
@@ -326,7 +337,7 @@ public class SettingsEventStreams extends BaseStreams {
		SettingsDTO settings = ((SettingsEvent) snapshotEvent).getSettings();

		if (!changeSelectionIsGranted(settings, newSelection)) {

			// TODO: generar nueva excepción. Si es necesario, añadir argumentos
			return SettingsEventFactory.getEvent(partialClearEvent, SettingsEventTypes.CLEAR_SELECTION_FAILED,
					ExceptionType.ES_SELECTION_WORK.toString(), null);
		}
@@ -398,6 +409,131 @@ public class SettingsEventStreams extends BaseStreams {
				eventError.getExceptionType(), eventError.getArguments());
	}

	// Save

	private void processPartialSaveSettings(KStream<String, Event> events, KTable<String, Event> snapshotKTable) {

		KStream<String, Event> partialEvents = events
				.filter((id, event) -> (SettingsEventTypes.PARTIAL_SAVE.equals(event.getType())))
				.selectKey((k, v) -> ((PartialSaveSettingsEvent) v).getPersistence().getSettingsId());

		partialEvents.leftJoin(snapshotKTable,
				(partialEvent, snapshotEvent) -> getSaveSettingsEvent((PartialSaveSettingsEvent) partialEvent,
						(SettingsEvent) snapshotEvent))
				.selectKey((k, v) -> v.getAggregateId()).to(topic);

	}

	private Event getSaveSettingsEvent(PartialSaveSettingsEvent partialEvent, SettingsEvent snapshotEvent) {

		if (snapshotEvent == null) {
			// TODO: generar nueva excepción. Si es necesario, añadir argumentos
			return SettingsEventFactory.getEvent(partialEvent, SettingsEventTypes.SAVE_FAILED,
					ExceptionType.ES_SELECTION_WORK.toString(), null);
		}

		SettingsDTO sourceSettings = snapshotEvent.getSettings();
		PersistenceDTO persistenceInfo = partialEvent.getPersistence();

		sourceSettings.setId(persistenceInfo.getId());
		sourceSettings.setName(persistenceInfo.getName());
		sourceSettings.setShared(persistenceInfo.getShared());

		return SettingsEventFactory.getEvent(partialEvent, SettingsEventTypes.SAVE, sourceSettings);
	}

	private void processSaveSettingsFailedStream(KStream<String, Event> events, KTable<String, Event> snapshotKTable) {

		// Stream filtrado por eventos de fallo al guardar
		KStream<String, Event> failedEvents = events
				.filter((id, event) -> (SettingsEventTypes.SAVE_FAILED.equals(event.getType())));

		// Join por id, mandando a kafka el evento de compensación
		failedEvents.leftJoin(snapshotKTable,
				(failedEvent, snapshotEvent) -> getSaveCancelledEvent(failedEvent, snapshotEvent)).to(topic);

	}

	private Event getSaveCancelledEvent(Event failedEvent, Event snapshotEvent) {

		assert failedEvent.getType().equals(SettingsEventTypes.SAVE_FAILED);

		assert SettingsEventTypes.isSnapshot(snapshotEvent.getType());

		EventError eventError = (EventError) failedEvent;

		SettingsDTO settings = ((SettingsEvent) snapshotEvent).getSettings();

		return SettingsEventFactory.getEvent(failedEvent, SettingsEventTypes.SAVE_CANCELLED, settings,
				eventError.getExceptionType(), eventError.getArguments());
	}

	private void processSaveSettingsConfirm(KStream<String, Event> events) {

		// Stream filtrado por eventos de confirmación al guardar
		KStream<String, Event> selectConfirmedEvents = events
				.filter((id, event) -> (SettingsEventTypes.SAVE_CONFIRMED.equals(event.getType())));

		// Table filtrado por eventos de petición de guardar selección (Siempre el
		// último
		// evento)
		KTable<String, Event> saveRequestEvents = events
				.filter((id, event) -> (SettingsEventTypes.SAVE.equals(event.getType()))).groupByKey()
				.reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de éxito
		selectConfirmedEvents
				.join(saveRequestEvents,
						(confirmedEvent, requestEvent) -> getSettingsSavedEvent(confirmedEvent, requestEvent))
				.filter((k, v) -> (v != null)).to(topic);

	}

	private Event getSettingsSavedEvent(Event confirmedEvent, Event requestEvent) {

		assert confirmedEvent.getType().equals(SettingsEventTypes.SAVE_CONFIRMED);

		assert requestEvent.getType().equals(SettingsEventTypes.SAVE);

		if (!isSameSession(confirmedEvent, requestEvent)) {
			return null;
		}

		SettingsDTO settings = ((SettingsEvent) requestEvent).getSettings();

		return SettingsEventFactory.getEvent(requestEvent, SettingsEventTypes.SAVED, settings);
	}

	// Delete

	private void processCheckDeleteSettings(KStream<String, Event> events, KTable<String, Event> snapshotKTable) {

		KStream<String, Event> checkDeleteEvents = events
				.filter((id, event) -> (SettingsEventTypes.CHECK_DELETE.equals(event.getType())));

		// Join por id, mandando a kafka el evento de compensación
		checkDeleteEvents
				.join(snapshotKTable,
						(checkDeleteEvent, snapshotEvent) -> getCheckResultEvent(checkDeleteEvent, snapshotEvent))
				.to(topic);

	}

	private Event getCheckResultEvent(Event checkDeleteEvent, Event snapshotEvent) {

		SettingsDTO settings = ((SettingsEvent) snapshotEvent).getSettings();

		CheckDeleteSettingsEvent evt = (CheckDeleteSettingsEvent) checkDeleteEvent;

		if (!settings.getShared() && evt.getUserId().equals(settings.getUserId())) {
			return SettingsEventFactory.getEvent(evt, SettingsEventTypes.DELETE_CHECKED);
		}

		// TODO: generar nueva excepción. Si es necesario, añadir argumentos
		return SettingsEventFactory.getEvent(evt, SettingsEventTypes.CHECK_DELETE_FAILED,
				ExceptionType.ES_SELECTION_WORK.toString(), null);
	}

	private boolean changeSelectionIsGranted(SettingsDTO settings, SelectionDTO newSelection) {

		if (!(settings.getService().equals(newSelection.getService())