Commit 53374da6 authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade streams para borrado de settings + tests

parent 79969c28
Loading
Loading
Loading
Loading
+34 −2
Original line number Diff line number Diff line
@@ -30,6 +30,7 @@ import org.mapstruct.factory.Mappers;
import es.redmic.brokerlib.alert.AlertService;
import es.redmic.brokerlib.avro.common.Event;
import es.redmic.brokerlib.avro.common.EventError;
import es.redmic.brokerlib.avro.common.EventTypes;
import es.redmic.commandslib.streaming.common.BaseStreams;
import es.redmic.commandslib.streaming.common.StreamConfig;
import es.redmic.commandslib.streaming.common.StreamUtils;
@@ -98,6 +99,8 @@ public class SettingsEventStreams extends BaseStreams {

		processCheckDeleteSettings(events, snapshotKTable);

		processDeleteFailedStream(events, snapshotKTable);

		return new KafkaStreams(builder.build(),
				StreamUtils.baseStreamsConfig(bootstrapServers, stateStoreDir, serviceId, schemaRegistry));
	}
@@ -458,11 +461,15 @@ public class SettingsEventStreams extends BaseStreams {

		assert failedEvent.getType().equals(SettingsEventTypes.SAVE_FAILED);

		assert SettingsEventTypes.isSnapshot(snapshotEvent.getType());
		assert snapshotEvent == null || SettingsEventTypes.isSnapshot(snapshotEvent.getType());

		EventError eventError = (EventError) failedEvent;

		SettingsDTO settings = ((SettingsEvent) snapshotEvent).getSettings();
		SettingsDTO settings = null;

		if (snapshotEvent != null) {
			settings = ((SettingsEvent) snapshotEvent).getSettings();
		}

		return SettingsEventFactory.getEvent(failedEvent, SettingsEventTypes.SAVE_CANCELLED, settings,
				eventError.getExceptionType(), eventError.getArguments());
@@ -534,6 +541,31 @@ public class SettingsEventStreams extends BaseStreams {
				ExceptionType.ES_SELECTION_WORK.toString(), null);
	}

	private void processDeleteFailedStream(KStream<String, Event> events, KTable<String, Event> snapshotKTable) {

		// Stream filtrado por eventos de fallo al borrar
		KStream<String, Event> failedEvents = events
				.filter((id, event) -> (EventTypes.DELETE_FAILED.equals(event.getType())));

		// Join por id, mandando a kafka el evento de compensación
		failedEvents.join(snapshotKTable,
				(failedEvent, snapshotEvent) -> getDeleteCancelledEvent(failedEvent, snapshotEvent)).to(topic);
	}

	private Event getDeleteCancelledEvent(Event failedEvent, Event snapshotEvent) {

		assert failedEvent.getType().equals(SettingsEventTypes.DELETE_FAILED);

		assert SettingsEventTypes.isSnapshot(snapshotEvent.getType());

		SettingsDTO settings = ((SettingsEvent) snapshotEvent).getSettings();

		EventError eventError = (EventError) failedEvent;

		return SettingsEventFactory.getEvent(failedEvent, SettingsEventTypes.DELETE_CANCELLED, settings,
				eventError.getExceptionType(), eventError.getArguments());
	}

	private boolean changeSelectionIsGranted(SettingsDTO settings, SelectionDTO newSelection) {

		if (!(settings.getService().equals(newSelection.getService())
+144 −1
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@ import static org.junit.Assert.assertNull;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

@@ -15,6 +16,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.Before;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaHandler;
@@ -24,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;

import es.redmic.brokerlib.avro.common.Event;
import es.redmic.commandslib.usersettings.handler.SettingsCommandHandler;
import es.redmic.exception.data.DeleteItemException;

/*-
 * #%L
@@ -58,6 +61,13 @@ import es.redmic.usersettingslib.events.clearselection.PartialClearSelectionEven
import es.redmic.usersettingslib.events.clearselection.SelectionClearedEvent;
import es.redmic.usersettingslib.events.common.SettingsCancelledEvent;
import es.redmic.usersettingslib.events.common.SettingsEvent;
import es.redmic.usersettingslib.events.delete.CheckDeleteSettingsEvent;
import es.redmic.usersettingslib.events.delete.DeleteSettingsCancelledEvent;
import es.redmic.usersettingslib.events.delete.DeleteSettingsCheckFailedEvent;
import es.redmic.usersettingslib.events.delete.DeleteSettingsCheckedEvent;
import es.redmic.usersettingslib.events.delete.DeleteSettingsConfirmedEvent;
import es.redmic.usersettingslib.events.delete.DeleteSettingsFailedEvent;
import es.redmic.usersettingslib.events.delete.SettingsDeletedEvent;
import es.redmic.usersettingslib.events.deselect.DeselectCancelledEvent;
import es.redmic.usersettingslib.events.deselect.DeselectConfirmedEvent;
import es.redmic.usersettingslib.events.deselect.DeselectEvent;
@@ -217,7 +227,7 @@ public class SettingsCommandHandlerBase extends KafkaBaseIntegrationTest {
	// Envía un evento de error de selección y debe provocar un evento Cancelled sin
	// el item dentro
	@Test
	public void selectFailedEvent_SendCategoryCancelledEventWithoutSettings_IfSelectionNoExist() throws Exception {
	public void selectFailedEvent_SendSelectCancelledEventWithoutSettings_IfSelectionNoExist() throws Exception {

		SelectFailedEvent event = SettingsDataUtil.getSelectFailedEvent(code + "5");

@@ -595,6 +605,113 @@ public class SettingsCommandHandlerBase extends KafkaBaseIntegrationTest {
				((SettingsCancelledEvent) cancelled).getSettings().getSelection());
	}

	// Delete

	// Envía un evento de comprobación de que el elemento puede ser borrado y debe
	// provocar un evento DeleteSettingsCheckedEvent ya que no está compartido
	@Test
	public void checkDeleteSettingsEvent_SendDeleteSettingsCheckedEvent_IfReceivesSuccess()
			throws InterruptedException {

		// Envía saved para meterlo en el stream y lo saca de la cola
		SettingsSavedEvent settingsSavedEvent = SettingsDataUtil.getSettingsSavedEvent(code + "20");
		kafkaTemplate.send(settings_topic, settingsSavedEvent.getAggregateId(), settingsSavedEvent);
		blockingQueue.poll(10, TimeUnit.SECONDS);

		Thread.sleep(1000);

		CheckDeleteSettingsEvent event = SettingsDataUtil.getCheckDeleteSettingsEvent(code + "20");

		kafkaTemplate.send(settings_topic, event.getAggregateId(), event);

		Event confirm = (Event) blockingQueue.poll(60, TimeUnit.SECONDS);

		assertNotNull(confirm);
		assertEquals(SettingsEventTypes.DELETE_CHECKED, confirm.getType());
		assertEquals(event.getAggregateId(), confirm.getAggregateId());
		assertEquals(event.getUserId(), confirm.getUserId());
		assertEquals(event.getSessionId(), confirm.getSessionId());
		assertEquals(event.getVersion(), confirm.getVersion());
	}

	// Envía un evento de comprobación de que el elemento puede ser borrado y debe
	// provocar un evento CheckDeleteSettingsFailedEvent ya que no está compartido
	@Test
	public void checkDeleteSettingsEvent_SendCheckDeleteSettingsFailedEvent_IfSettigsAreShared()
			throws InterruptedException {

		// Envía saved para meterlo en el stream y lo saca de la cola
		SettingsSavedEvent settingsSavedEvent = SettingsDataUtil.getSettingsSavedEvent(code + "21");
		settingsSavedEvent.getSettings().setShared(true);
		kafkaTemplate.send(settings_topic, settingsSavedEvent.getAggregateId(), settingsSavedEvent);
		blockingQueue.poll(10, TimeUnit.SECONDS);

		Thread.sleep(1000);

		CheckDeleteSettingsEvent event = SettingsDataUtil.getCheckDeleteSettingsEvent(code + "21");

		kafkaTemplate.send(settings_topic, event.getAggregateId(), event);

		Event confirm = (Event) blockingQueue.poll(60, TimeUnit.SECONDS);

		assertNotNull(confirm);
		assertEquals(SettingsEventTypes.CHECK_DELETE_FAILED, confirm.getType());
		assertEquals(event.getAggregateId(), confirm.getAggregateId());
		assertEquals(event.getUserId(), confirm.getUserId());
		assertEquals(event.getSessionId(), confirm.getSessionId());
		assertEquals(event.getVersion(), confirm.getVersion());
	}

	// Envía un evento de confirmación de borrado y debe provocar un evento Deleted
	@Test
	public void deleteSettingsConfirmedEvent_SendSettingsDeletedEvent_IfReceivesSuccess() throws InterruptedException {

		DeleteSettingsConfirmedEvent event = SettingsDataUtil.getDeleteSettingsConfirmedEvent(code + "22");

		kafkaTemplate.send(settings_topic, event.getAggregateId(), event);

		Event confirm = (Event) blockingQueue.poll(120, TimeUnit.SECONDS);

		assertNotNull(confirm);
		assertEquals(SettingsEventTypes.DELETED, confirm.getType());
		assertEquals(event.getAggregateId(), confirm.getAggregateId());
		assertEquals(event.getUserId(), confirm.getUserId());
		assertEquals(event.getSessionId(), confirm.getSessionId());
		assertEquals(event.getVersion(), confirm.getVersion());
	}

	// Envía un evento de error de borrado y debe provocar un evento Cancelled con
	// el item dentro
	@Test(expected = DeleteItemException.class)
	public void deleteSettingsFailedEvent_SendSettingsCancelledEvent_IfReceivesSuccess() throws Exception {

		// Envía saved para meterlo en el stream y lo saca de la cola
		SettingsSavedEvent settingsSavedEvent = SettingsDataUtil.getSettingsSavedEvent(code + "23");
		kafkaTemplate.send(settings_topic, settingsSavedEvent.getAggregateId(), settingsSavedEvent);
		blockingQueue.poll(10, TimeUnit.SECONDS);

		Thread.sleep(1000);

		// Envía failed y espera un evento de cancelled con settings original dentro
		DeleteSettingsFailedEvent event = SettingsDataUtil.getDeleteSettingsFailedEvent(code + "23");

		// Añade completableFeature para que se resuelva al recibir el mensaje.
		CompletableFuture<SettingsDTO> completableFuture = Whitebox.invokeMethod(settingsCommandHandler,
				"getCompletableFeature", event.getSessionId(), settingsSavedEvent.getSettings());

		kafkaTemplate.send(settings_topic, event.getAggregateId(), event);

		Event cancelled = (Event) blockingQueue.poll(60, TimeUnit.SECONDS);

		// Obtiene el resultado
		Whitebox.invokeMethod(settingsCommandHandler, "getResult", event.getSessionId(), completableFuture);

		assertNotNull(cancelled);
		assertEquals(SettingsEventTypes.DELETE_CANCELLED, cancelled.getType());
		assertEquals(mapper.writeValueAsString(settingsSavedEvent.getSettings()),
				mapper.writeValueAsString(((DeleteSettingsCancelledEvent) cancelled).getSettings()));
	}

	// Select

	@KafkaHandler
@@ -699,6 +816,32 @@ public class SettingsCommandHandlerBase extends KafkaBaseIntegrationTest {
		blockingQueue.offer(saveSettingsCancelledEvent);
	}

	// Delete

	@KafkaHandler
	public void settingsDeletedEvent(SettingsDeletedEvent settingsDeletedEvent) {

		blockingQueue.offer(settingsDeletedEvent);
	}

	@KafkaHandler
	public void deleteSettingsCancelledEvent(DeleteSettingsCancelledEvent deleteSettingsCancelledEvent) {

		blockingQueue.offer(deleteSettingsCancelledEvent);
	}

	@KafkaHandler
	public void deleteSettingsCheckedEvent(DeleteSettingsCheckedEvent deleteSettingsCheckedEvent) {

		blockingQueue.offer(deleteSettingsCheckedEvent);
	}

	@KafkaHandler
	public void deleteSettingsCheckFailedEvent(DeleteSettingsCheckFailedEvent deleteSettingsCheckFailedEvent) {

		blockingQueue.offer(deleteSettingsCheckFailedEvent);
	}

	@KafkaHandler(isDefault = true)
	public void defaultEvent(Object def) {