Commit 98591426 authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade steams/handlers para manejar nuevos evento

De esta manera al clonar se guarda una copia de settings y se actualiza
la fecha de acceso de la original
parent 672894d6
Loading
Loading
Loading
Loading
+54 −0
Original line number Diff line number Diff line
@@ -38,10 +38,12 @@ import es.redmic.commandslib.streaming.common.StreamConfig.Builder;
import es.redmic.commandslib.usersettings.aggregate.PersistenceAggregate;
import es.redmic.commandslib.usersettings.aggregate.SelectionAggregate;
import es.redmic.commandslib.usersettings.commands.ClearCommand;
import es.redmic.commandslib.usersettings.commands.CloneSettingsCommand;
import es.redmic.commandslib.usersettings.commands.DeleteSettingsCommand;
import es.redmic.commandslib.usersettings.commands.DeselectCommand;
import es.redmic.commandslib.usersettings.commands.SaveSettingsCommand;
import es.redmic.commandslib.usersettings.commands.SelectCommand;
import es.redmic.commandslib.usersettings.commands.UpdateSettingsAccessedDateCommand;
import es.redmic.commandslib.usersettings.commands.UpdateSettingsCommand;
import es.redmic.commandslib.usersettings.statestore.SettingsStateStore;
import es.redmic.commandslib.usersettings.streams.SettingsEventStreams;
@@ -53,6 +55,7 @@ import es.redmic.usersettingslib.events.SettingsEventTypes;
import es.redmic.usersettingslib.events.clearselection.ClearSelectionCancelledEvent;
import es.redmic.usersettingslib.events.clearselection.PartialClearSelectionEvent;
import es.redmic.usersettingslib.events.clearselection.SelectionClearedEvent;
import es.redmic.usersettingslib.events.clone.CloneSettingsEvent;
import es.redmic.usersettingslib.events.delete.CheckDeleteSettingsEvent;
import es.redmic.usersettingslib.events.delete.DeleteSettingsCancelledEvent;
import es.redmic.usersettingslib.events.delete.DeleteSettingsCheckedEvent;
@@ -67,6 +70,7 @@ import es.redmic.usersettingslib.events.save.SettingsSavedEvent;
import es.redmic.usersettingslib.events.select.PartialSelectEvent;
import es.redmic.usersettingslib.events.select.SelectCancelledEvent;
import es.redmic.usersettingslib.events.select.SelectedEvent;
import es.redmic.usersettingslib.events.update.UpdateSettingsAccessedDateEvent;

@Component
@ConditionalOnProperty(name = "redmic.user-settings.enabled", havingValue = "true")
@@ -329,6 +333,56 @@ public class SettingsCommandHandler extends CommandHandler {
		return getResult(event.getSessionId(), completableFuture);
	}

	public SettingsDTO clone(CloneSettingsCommand cmd) {

		PersistenceAggregate agg = new PersistenceAggregate(settingsStateStore);

		// Se procesa el comando, obteniendo el evento generado
		logger.debug("Procesando CloneSettingsCommand");

		CloneSettingsEvent event = agg.process(cmd);

		// Si no se genera evento significa que no se debe aplicar
		if (event == null)
			return null;

		String userId = userService.getUserId();
		event.setUserId(userId);
		event.getPersistence().setUserId(userId);

		// Se aplica el evento
		agg.apply(event);

		logger.debug("Aplicado evento: " + event.getType());

		// Crea la espera hasta que se responda con evento completado
		CompletableFuture<SettingsDTO> completableFuture = getCompletableFeature(event.getSessionId());

		// Emite evento para enviar a kafka
		publishToKafka(event, settingsTopic);

		updateSettingsAccessedDate(new UpdateSettingsAccessedDateCommand(cmd.getPersistence().getSettingsId()));

		// Obtiene el resultado cuando se resuelva la espera
		return getResult(event.getSessionId(), completableFuture);
	}

	public void updateSettingsAccessedDate(UpdateSettingsAccessedDateCommand cmd) {

		PersistenceAggregate agg = new PersistenceAggregate(settingsStateStore);

		// Se procesa el comando, obteniendo el evento generado
		logger.debug("Procesando UpdateSettingsAccessedDateCommand");

		UpdateSettingsAccessedDateEvent event = agg.process(cmd);

		// Si no se genera evento significa que no se debe aplicar
		if (event == null)
			logger.error("Imposible actualizar fecha de accedido para el item " + cmd.getSettingsId());

		publishToKafka(event, settingsTopic);
	}

	// Select

	@KafkaHandler
+73 −0
Original line number Diff line number Diff line
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.StreamsBuilder;

import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.joda.time.DateTime;
import org.mapstruct.factory.Mappers;

import es.redmic.brokerlib.alert.AlertService;
@@ -43,10 +44,12 @@ import es.redmic.usersettingslib.dto.SelectionDTO;
import es.redmic.usersettingslib.dto.SettingsDTO;
import es.redmic.usersettingslib.events.SettingsEventFactory;
import es.redmic.usersettingslib.events.SettingsEventTypes;
import es.redmic.usersettingslib.events.clone.CloneSettingsEvent;
import es.redmic.usersettingslib.events.common.SelectionEvent;
import es.redmic.usersettingslib.events.common.SettingsEvent;
import es.redmic.usersettingslib.events.delete.CheckDeleteSettingsEvent;
import es.redmic.usersettingslib.events.save.PartialSaveSettingsEvent;
import es.redmic.usersettingslib.events.update.UpdateSettingsAccessedDateEvent;
import es.redmic.usersettingslib.mapper.SettingsMapper;

public class SettingsEventStreams extends BaseStreams {
@@ -104,6 +107,10 @@ public class SettingsEventStreams extends BaseStreams {

		processDeleteFailedStream(events, snapshotKTable);

		processCloneSettings(events, snapshotKTable);

		processUpdateSettingsAccessedDate(events, snapshotKTable);

		return new KafkaStreams(builder.build(),
				StreamUtils.baseStreamsConfig(bootstrapServers, stateStoreDir, serviceId, schemaRegistry));
	}
@@ -519,6 +526,72 @@ public class SettingsEventStreams extends BaseStreams {
		return SettingsEventFactory.getEvent(confirmedEvent, SettingsEventTypes.SAVED, settings);
	}

	// Clone

	private void processCloneSettings(KStream<String, Event> events, KTable<String, Event> snapshotKTable) {

		KStream<String, Event> cloneEvents = events
				.filter((id, event) -> (SettingsEventTypes.CLONE.equals(event.getType())))
				.selectKey((k, v) -> ((CloneSettingsEvent) v).getPersistence().getSettingsId());

		// Envía evento para guardar nuevas settings de trabajo
		cloneEvents.leftJoin(snapshotKTable,
				(cloneEvent, snapshotEvent) -> getSaveSettingsByCloneEvent((CloneSettingsEvent) cloneEvent,
						(SettingsEvent) snapshotEvent))
				.selectKey((k, v) -> v.getAggregateId()).to(topic);
	}

	private Event getSaveSettingsByCloneEvent(CloneSettingsEvent cloneEvent, SettingsEvent snapshotEvent) {

		if (snapshotEvent == null) {
			// TODO: generar nueva excepción. Si es necesario, añadir argumentos
			return SettingsEventFactory.getEvent(cloneEvent, SettingsEventTypes.SAVE_FAILED,
					ExceptionType.ES_SELECTION_WORK.toString(), null);
		}

		SettingsDTO sourceSettings = snapshotEvent.getSettings();
		PersistenceDTO persistenceInfo = cloneEvent.getPersistence();

		sourceSettings.setId(persistenceInfo.getId());
		sourceSettings.setName(null);
		sourceSettings.setShared(false);
		sourceSettings.setUserId(cloneEvent.getUserId());

		return SettingsEventFactory.getEvent(cloneEvent, SettingsEventTypes.SAVE, sourceSettings);
	}

	// UpdateSettingsAccessedDate

	private void processUpdateSettingsAccessedDate(KStream<String, Event> events,
			KTable<String, Event> snapshotKTable) {

		KStream<String, Event> updateAccessedDateEvents = events
				.filter((id, event) -> (SettingsEventTypes.UPDATE_ACCESSED_DATE.equals(event.getType())));

		// Envía evento para actualizar la fecha de acceso de las settings copiada
		updateAccessedDateEvents.leftJoin(snapshotKTable,
				(updateAccessedDateEvent, snapshotEvent) -> getUpdateSettingsAccessedDateEvent(
						(UpdateSettingsAccessedDateEvent) updateAccessedDateEvent, (SettingsEvent) snapshotEvent))
				.to(topic);

	}

	private Event getUpdateSettingsAccessedDateEvent(UpdateSettingsAccessedDateEvent updateAccessedDateEvent,
			SettingsEvent snapshotEvent) {

		if (snapshotEvent == null) {
			// TODO: generar nueva excepción. Si es necesario, añadir argumentos
			return SettingsEventFactory.getEvent(updateAccessedDateEvent, SettingsEventTypes.SAVE_FAILED,
					ExceptionType.ES_SELECTION_WORK.toString(), null);
		}

		SettingsDTO sourceSettings = snapshotEvent.getSettings();

		sourceSettings.setAccessed(DateTime.now());

		return SettingsEventFactory.getEvent(updateAccessedDateEvent, SettingsEventTypes.SAVE, sourceSettings);
	}

	// Delete

	private void processCheckDeleteSettings(KStream<String, Event> events, KTable<String, Event> snapshotKTable) {