Commit 16c9afde authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade stream de eventos de rollback para settings

Añade test
parent 83c57ed6
Loading
Loading
Loading
Loading
+21 −0
Original line number Diff line number Diff line
@@ -111,6 +111,8 @@ public class SettingsEventStreams extends BaseStreams {

		processUpdateSettingsAccessedDate(events, snapshotKTable);

		proccessRollbackStream(events, snapshotKTable);

		return new KafkaStreams(builder.build(),
				StreamUtils.baseStreamsConfig(bootstrapServers, stateStoreDir, serviceId, schemaRegistry));
	}
@@ -649,6 +651,25 @@ public class SettingsEventStreams extends BaseStreams {
				eventError.getExceptionType(), eventError.getArguments());
	}

	private void proccessRollbackStream(KStream<String, Event> events, KTable<String, Event> successEventsTable) {

		// Stream filtrado por eventos pre rollback
		KStream<String, Event> prepareRollbackEvents = events
				.filter((id, event) -> (EventTypes.PREPARE_ROLLBACK.equals(event.getType())));

		// Join por id, mandando a kafka el evento específico
		prepareRollbackEvents.join(successEventsTable,
				(prepareRollbackEvent, lastSuccessEvent) -> getRollbackEvent(prepareRollbackEvent, lastSuccessEvent))
				.to(topic);
	}

	private Event getRollbackEvent(Event prepareRollbackEvent, Event lastSuccessEvent) {

		return SettingsEventFactory.getEvent(prepareRollbackEvent, SettingsEventTypes.ROLLBACK,
				((SettingsEvent) lastSuccessEvent).getSettings());

	}

	private boolean changeSelectionIsGranted(SettingsDTO settings, SelectionDTO newSelection) {

		if (!(settings.getService().equals(newSelection.getService())
+35 −0
Original line number Diff line number Diff line
@@ -26,6 +26,7 @@ import org.springframework.kafka.core.KafkaTemplate;
import com.fasterxml.jackson.databind.ObjectMapper;

import es.redmic.brokerlib.avro.common.Event;
import es.redmic.brokerlib.avro.fail.PrepareRollbackEvent;
import es.redmic.commandslib.usersettings.handler.SettingsCommandHandler;
import es.redmic.exception.common.ExceptionType;
import es.redmic.exception.data.DeleteItemException;
@@ -77,6 +78,7 @@ import es.redmic.usersettingslib.events.deselect.DeselectEvent;
import es.redmic.usersettingslib.events.deselect.DeselectFailedEvent;
import es.redmic.usersettingslib.events.deselect.DeselectedEvent;
import es.redmic.usersettingslib.events.deselect.PartialDeselectEvent;
import es.redmic.usersettingslib.events.fail.SettingsRollbackEvent;
import es.redmic.usersettingslib.events.save.PartialSaveSettingsEvent;
import es.redmic.usersettingslib.events.save.SaveSettingsCancelledEvent;
import es.redmic.usersettingslib.events.save.SaveSettingsConfirmedEvent;
@@ -856,6 +858,31 @@ public class SettingsCommandHandlerBase extends KafkaBaseIntegrationTest {
		assertNotEquals(sourceSettings.getAccessed(), settingsForUpdate.getAccessed());
	}

	// Envía un evento de prepare rollback y debe provocar un evento
	// SettingsRollback con el item dentro
	@Test
	public void prepareRollbackEvent_SendSettingsRollbackEvent_IfReceivesSuccess() throws Exception {

		// Envía created para meterlo en el stream y lo saca de la cola
		SettingsSavedEvent settingsSavedEvent = SettingsDataUtil.getSettingsSavedEvent(code + "37");
		kafkaTemplate.send(settings_topic, settingsSavedEvent.getAggregateId(), settingsSavedEvent);
		blockingQueue.poll(10, TimeUnit.SECONDS);

		PrepareRollbackEvent event = SettingsDataUtil.getPrepareRollbackEvent(code + "37");

		kafkaTemplate.send(settings_topic, event.getAggregateId(), event);

		Event rollback = (Event) blockingQueue.poll(30, TimeUnit.SECONDS);

		assertNotNull(rollback);
		assertEquals(SettingsEventTypes.ROLLBACK, rollback.getType());
		assertEquals(event.getFailEventType(), ((SettingsRollbackEvent) rollback).getFailEventType());
		assertEquals(settingsSavedEvent.getSettings().getId(),
				((SettingsRollbackEvent) rollback).getLastSnapshotItem().getId());
		assertEquals(settingsSavedEvent.getSettings().getUpdated().getMillis(),
				((SettingsRollbackEvent) rollback).getLastSnapshotItem().getUpdated().getMillis());
	}

	// Select

	@KafkaHandler
@@ -986,6 +1013,14 @@ public class SettingsCommandHandlerBase extends KafkaBaseIntegrationTest {
		blockingQueue.offer(deleteSettingsCheckFailedEvent);
	}

	//

	@KafkaHandler
	public void settingsRollbackEvent(SettingsRollbackEvent settingsRollbackEvent) {

		blockingQueue.offer(settingsRollbackEvent);
	}

	@KafkaHandler(isDefault = true)
	public void defaultEvent(Object def) {