Commit 139a4fbc authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade tests para rollback y rollback fallidos

parent 878b61fe
Loading
Loading
Loading
Loading
+359 −7
Original line number Diff line number Diff line
@@ -22,6 +22,8 @@ package es.redmic.test.atlascommands.integration.category;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.util.UUID;
import java.util.concurrent.BlockingQueue;
@@ -53,6 +55,10 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.util.concurrent.ListenableFuture;

import es.redmic.atlascommands.AtlasCommandsApplication;
import es.redmic.atlascommands.aggregate.CategoryAggregate;
import es.redmic.atlascommands.commands.category.CreateCategoryCommand;
import es.redmic.atlascommands.commands.category.DeleteCategoryCommand;
import es.redmic.atlascommands.commands.category.UpdateCategoryCommand;
import es.redmic.atlascommands.handler.CategoryCommandHandler;
import es.redmic.atlaslib.dto.category.CategoryDTO;
import es.redmic.atlaslib.events.category.CategoryEventTypes;
@@ -67,6 +73,7 @@ import es.redmic.atlaslib.events.category.delete.DeleteCategoryCancelledEvent;
import es.redmic.atlaslib.events.category.delete.DeleteCategoryCheckFailedEvent;
import es.redmic.atlaslib.events.category.delete.DeleteCategoryCheckedEvent;
import es.redmic.atlaslib.events.category.delete.DeleteCategoryConfirmedEvent;
import es.redmic.atlaslib.events.category.delete.DeleteCategoryEvent;
import es.redmic.atlaslib.events.category.delete.DeleteCategoryFailedEvent;
import es.redmic.atlaslib.events.category.fail.CategoryRollbackEvent;
import es.redmic.atlaslib.events.category.update.CategoryUpdatedEvent;
@@ -75,9 +82,15 @@ import es.redmic.atlaslib.events.category.update.UpdateCategoryConfirmedEvent;
import es.redmic.atlaslib.events.category.update.UpdateCategoryEvent;
import es.redmic.atlaslib.events.category.update.UpdateCategoryFailedEvent;
import es.redmic.atlaslib.unit.utils.CategoryDataUtil;
import es.redmic.brokerlib.alert.AlertType;
import es.redmic.brokerlib.alert.Message;
import es.redmic.brokerlib.avro.common.Event;
import es.redmic.brokerlib.avro.common.EventTypes;
import es.redmic.brokerlib.avro.fail.PrepareRollbackEvent;
import es.redmic.brokerlib.avro.fail.RollbackFailedEvent;
import es.redmic.brokerlib.listener.SendListener;
import es.redmic.commandslib.exceptions.ConfirmationTimeoutException;
import es.redmic.commandslib.exceptions.ItemLockedException;
import es.redmic.exception.data.DeleteItemException;
import es.redmic.exception.data.ItemAlreadyExistException;
import es.redmic.exception.data.ItemNotFoundException;
@@ -109,6 +122,8 @@ public class CategoryCommandHandlerTest extends KafkaBaseIntegrationTest {

	protected static BlockingQueue<Object> blockingQueue;

	protected static BlockingQueue<Object> blockingQueueForAlerts;

	@Autowired
	CategoryCommandHandler categoryCommandHandler;

@@ -123,6 +138,7 @@ public class CategoryCommandHandlerTest extends KafkaBaseIntegrationTest {
	public void setup() {

		blockingQueue = new LinkedBlockingDeque<>();
		blockingQueueForAlerts = new LinkedBlockingDeque<>();
	}

	// Success cases
@@ -322,24 +338,355 @@ public class CategoryCommandHandlerTest extends KafkaBaseIntegrationTest {
		assertEquals(categoryUpdateEvent.getCategory(), ((DeleteCategoryCancelledEvent) confirm).getCategory());
	}

	// Envía un evento de error de prepare rollback y debe provocar un evento
	// CategoryRollback con el item dentro
	// Rollback

	// Create

	// ConfirmationTimeoutException
	@Test
	public void prepareRollbackEvent_SendCategoryRollbackEvent_IfReceivesSuccess() throws Exception {
	public void createCategory_ThrowConfirmationTimeoutExceptionAndSendRollbackEvent_IfConfirmationIsNotReceived()
			throws Exception {

		try {
			Whitebox.invokeMethod(categoryCommandHandler, "save",
					new CreateCategoryCommand(CategoryDataUtil.getCategory(code + 7)));
		} catch (Exception e) {
			assertTrue(e instanceof ConfirmationTimeoutException);
		}
		Event rollback = (Event) blockingQueue.poll(40, TimeUnit.SECONDS);

		// Envía created para meterlo en el stream y lo saca de la cola
		CategoryCreatedEvent categoryCreatedEvent = CategoryDataUtil.getCategoryCreatedEvent(code + "7");
		assertNotNull(rollback);
		assertEquals(EventTypes.ROLLBACK, rollback.getType());
		assertEquals(CategoryEventTypes.CREATE, ((CategoryRollbackEvent) rollback).getFailEventType());

		// LLegó un mensaje de alerta
		Message message = (Message) blockingQueueForAlerts.poll(40, TimeUnit.SECONDS);
		assertNotNull(message);
		assertEquals(AlertType.ERROR.name(), message.getType());
	}

	// ItemLockedException
	@Test
	public void createCategory_ThrowItemLockedExceptionAndSendRollbackEvent_IfItemLocked() throws Exception {

		RollbackFailedEvent rollbackFailedEvent = new RollbackFailedEvent(CategoryEventTypes.CREATE_FAILED)
				.buildFrom(CategoryDataUtil.getCategoryCreatedEvent(code + "8"));
		kafkaTemplate.send(category_topic, rollbackFailedEvent.getAggregateId(), rollbackFailedEvent);

		Thread.sleep(8000);

		try {
			Whitebox.invokeMethod(categoryCommandHandler, "save",
					new CreateCategoryCommand(CategoryDataUtil.getCategory(code + "8")));
		} catch (Exception e) {
			assertTrue(e instanceof ItemLockedException);
		}
		Event rollback = (Event) blockingQueue.poll(40, TimeUnit.SECONDS);

		assertNotNull(rollback);
		assertEquals(EventTypes.ROLLBACK, rollback.getType());
		assertEquals(rollbackFailedEvent.getFailEventType(), ((CategoryRollbackEvent) rollback).getFailEventType());

		// LLegó un mensaje de alerta
		Message message = (Message) blockingQueueForAlerts.poll(40, TimeUnit.SECONDS);
		assertNotNull(message);
		assertEquals(AlertType.ERROR.name(), message.getType());
	}

	@Test
	public void prepareRollbackEvent_SendCategoryRollbackEventWithFailEventTypeEqualToCreateCategory_IfItemIsLocked()
			throws Exception {

		CreateCategoryEvent createCategoryEvent = CategoryDataUtil.getCreateEvent(code + "9");
		kafkaTemplate.send(category_topic, createCategoryEvent.getAggregateId(), createCategoryEvent);
		blockingQueue.poll(30, TimeUnit.SECONDS);

		Thread.sleep(8000);

		PrepareRollbackEvent event = (PrepareRollbackEvent) new CategoryAggregate(null, null)
				.getRollbackEvent(createCategoryEvent);

		kafkaTemplate.send(category_topic, event.getAggregateId(), event);

		Event rollback = (Event) blockingQueue.poll(40, TimeUnit.SECONDS);

		assertNotNull(rollback);
		assertEquals(CategoryEventTypes.ROLLBACK, rollback.getType());
		assertEquals(createCategoryEvent.getType(), event.getFailEventType());
		assertEquals(event.getFailEventType(), ((CategoryRollbackEvent) rollback).getFailEventType());
		assertNull(((CategoryRollbackEvent) rollback).getLastSnapshotItem());
	}

	@Test
	public void prepareRollbackEventAfterRollbackFail_SendCategoryRollbackEventWithFailEventTypeEqualToCreateCategory_IfItemIsLocked()
			throws Exception {

		CreateCategoryEvent createCategoryEvent = CategoryDataUtil.getCreateEvent(code + "10");
		kafkaTemplate.send(category_topic, createCategoryEvent.getAggregateId(), createCategoryEvent);
		blockingQueue.poll(30, TimeUnit.SECONDS);

		RollbackFailedEvent rollbackFailedEvent = new RollbackFailedEvent(CategoryEventTypes.CREATE_FAILED)
				.buildFrom(createCategoryEvent);
		kafkaTemplate.send(category_topic, rollbackFailedEvent.getAggregateId(), rollbackFailedEvent);

		Thread.sleep(8000);

		PrepareRollbackEvent event = (PrepareRollbackEvent) new CategoryAggregate(null, null)
				.getRollbackEvent(rollbackFailedEvent);

		try {
			kafkaTemplate.send(category_topic, event.getAggregateId(), event);

		} catch (Exception e) {
			assertTrue(e instanceof ItemLockedException);
		}
		Event rollback = (Event) blockingQueue.poll(60, TimeUnit.SECONDS);
		assertNotNull(rollback);
		assertEquals(CategoryEventTypes.ROLLBACK, rollback.getType());
		assertEquals(event.getFailEventType(), ((CategoryRollbackEvent) rollback).getFailEventType());
		assertNull(((CategoryRollbackEvent) rollback).getLastSnapshotItem());
	}

	// Update
	// ConfirmationTimeoutException
	@Test
	public void updateCategory_ThrowConfirmationTimeoutExceptionAndSendRollbackEvent_IfConfirmationIsNotReceived()
			throws Exception {

		CategoryCreatedEvent categoryCreatedEvent = CategoryDataUtil.getCategoryCreatedEvent(code + "11");
		kafkaTemplate.send(category_topic, categoryCreatedEvent.getAggregateId(), categoryCreatedEvent);
		blockingQueue.poll(30, TimeUnit.SECONDS);

		try {
			Whitebox.invokeMethod(categoryCommandHandler, "update", categoryCreatedEvent.getAggregateId(),
					new UpdateCategoryCommand(categoryCreatedEvent.getCategory()));
		} catch (Exception e) {
			assertTrue(e instanceof ConfirmationTimeoutException);
		}
		Event rollback = (Event) blockingQueue.poll(40, TimeUnit.SECONDS);

		assertNotNull(rollback);
		assertEquals(EventTypes.ROLLBACK, rollback.getType());
		assertEquals(CategoryEventTypes.UPDATE, ((CategoryRollbackEvent) rollback).getFailEventType());

		// LLegó un mensaje de alerta
		Message message = (Message) blockingQueueForAlerts.poll(40, TimeUnit.SECONDS);
		assertNotNull(message);
		assertEquals(AlertType.ERROR.name(), message.getType());
	}

	// ItemLockedException
	@Test
	public void updateCategory_ThrowItemLockedExceptionAndSendRollbackEvent_IfItemLocked() throws Exception {

		CategoryCreatedEvent categoryCreatedEvent = CategoryDataUtil.getCategoryCreatedEvent(code + "12");
		kafkaTemplate.send(category_topic, categoryCreatedEvent.getAggregateId(), categoryCreatedEvent);
		blockingQueue.poll(30, TimeUnit.SECONDS);

		RollbackFailedEvent rollbackFailedEvent = new RollbackFailedEvent(CategoryEventTypes.UPDATE_FAILED)
				.buildFrom(categoryCreatedEvent);
		kafkaTemplate.send(category_topic, rollbackFailedEvent.getAggregateId(), rollbackFailedEvent);

		Thread.sleep(8000);

		try {
			Whitebox.invokeMethod(categoryCommandHandler, "update", categoryCreatedEvent.getAggregateId(),
					new UpdateCategoryCommand(categoryCreatedEvent.getCategory()));
		} catch (Exception e) {
			assertTrue(e instanceof ItemLockedException);
		}
		Event rollback = (Event) blockingQueue.poll(40, TimeUnit.SECONDS);

		assertNotNull(rollback);
		assertEquals(EventTypes.ROLLBACK, rollback.getType());
		assertEquals(rollbackFailedEvent.getFailEventType(), ((CategoryRollbackEvent) rollback).getFailEventType());

		// LLegó un mensaje de alerta
		Message message = (Message) blockingQueueForAlerts.poll(50, TimeUnit.SECONDS);
		assertNotNull(message);
		assertEquals(AlertType.ERROR.name(), message.getType());
	}

	@Test
	public void prepareRollbackEvent_SendCategoryRollbackEventWithFailEventTypeEqualToUpdateCategory_IfItemIsLocked()
			throws Exception {

		CategoryCreatedEvent categoryCreatedEvent = CategoryDataUtil.getCategoryCreatedEvent(code + "13");
		kafkaTemplate.send(category_topic, categoryCreatedEvent.getAggregateId(), categoryCreatedEvent);
		blockingQueue.poll(30, TimeUnit.SECONDS);

		UpdateCategoryEvent updateCategoryEvent = CategoryDataUtil.getUpdateEvent(code + "13");
		kafkaTemplate.send(category_topic, updateCategoryEvent.getAggregateId(), updateCategoryEvent);

		Thread.sleep(8000);

		PrepareRollbackEvent event = (PrepareRollbackEvent) new CategoryAggregate(null, null)
				.getRollbackEvent(updateCategoryEvent);

		kafkaTemplate.send(category_topic, event.getAggregateId(), event);

		Event rollback = (Event) blockingQueue.poll(40, TimeUnit.SECONDS);

		assertNotNull(rollback);
		assertEquals(CategoryEventTypes.ROLLBACK, rollback.getType());
		assertEquals(updateCategoryEvent.getType(), ((CategoryRollbackEvent) rollback).getFailEventType());
		assertEquals(event.getFailEventType(), ((CategoryRollbackEvent) rollback).getFailEventType());
		assertEquals(updateCategoryEvent.getCategory(), ((CategoryRollbackEvent) rollback).getLastSnapshotItem());
	}

	@Test
	public void prepareRollbackEventAfterRollbackFail_SendCategoryRollbackEventWithFailEventTypeEqualToUpdateCategory_IfItemIsLocked()
			throws Exception {

		CategoryCreatedEvent categoryCreatedEvent = CategoryDataUtil.getCategoryCreatedEvent(code + "14");
		kafkaTemplate.send(category_topic, categoryCreatedEvent.getAggregateId(), categoryCreatedEvent);
		blockingQueue.poll(30, TimeUnit.SECONDS);

		UpdateCategoryEvent updateCategoryEvent = CategoryDataUtil.getUpdateEvent(code + "14");
		kafkaTemplate.send(category_topic, updateCategoryEvent.getAggregateId(), updateCategoryEvent);

		RollbackFailedEvent rollbackFailedEvent = new RollbackFailedEvent(CategoryEventTypes.UPDATE_FAILED)
				.buildFrom(updateCategoryEvent);
		kafkaTemplate.send(category_topic, rollbackFailedEvent.getAggregateId(), rollbackFailedEvent);

		Thread.sleep(8000);

		PrepareRollbackEvent event = CategoryDataUtil.getPrepareRollbackEvent(code + "7");
		PrepareRollbackEvent event = (PrepareRollbackEvent) new CategoryAggregate(null, null)
				.getRollbackEvent(rollbackFailedEvent);

		try {
			kafkaTemplate.send(category_topic, event.getAggregateId(), event);

		Event rollback = (Event) blockingQueue.poll(30, TimeUnit.SECONDS);
		} catch (Exception e) {
			assertTrue(e instanceof ItemLockedException);
		}
		Event rollback = (Event) blockingQueue.poll(60, TimeUnit.SECONDS);
		assertNotNull(rollback);
		assertEquals(CategoryEventTypes.ROLLBACK, rollback.getType());
		assertEquals(event.getFailEventType(), ((CategoryRollbackEvent) rollback).getFailEventType());
		assertEquals(updateCategoryEvent.getCategory(), ((CategoryRollbackEvent) rollback).getLastSnapshotItem());
	}

	// Delete
	// ConfirmationTimeoutException
	@Test
	public void deleteCategory_ThrowConfirmationTimeoutExceptionAndSendRollbackEvent_IfConfirmationIsNotReceived()
			throws Exception {

		CategoryCreatedEvent categoryCreatedEvent = CategoryDataUtil.getCategoryCreatedEvent(code + "15");
		kafkaTemplate.send(category_topic, categoryCreatedEvent.getAggregateId(), categoryCreatedEvent);
		blockingQueue.poll(30, TimeUnit.SECONDS);

		try {
			Whitebox.invokeMethod(categoryCommandHandler, "update", categoryCreatedEvent.getAggregateId(),
					new DeleteCategoryCommand(categoryCreatedEvent.getCategory().getId()));
		} catch (Exception e) {
			assertTrue(e instanceof ConfirmationTimeoutException);
		}

		Event confirm = (Event) blockingQueue.poll(40, TimeUnit.SECONDS);

		assertNotNull(confirm);
		assertEquals(CategoryEventTypes.DELETE_CHECKED, confirm.getType());

		Event rollback = (Event) blockingQueue.poll(40, TimeUnit.SECONDS);

		assertNotNull(rollback);
		assertEquals(EventTypes.ROLLBACK, rollback.getType());
		assertEquals(CategoryEventTypes.CHECK_DELETE, ((CategoryRollbackEvent) rollback).getFailEventType());

		// LLegó un mensaje de alerta
		Message message = (Message) blockingQueueForAlerts.poll(40, TimeUnit.SECONDS);
		assertNotNull(message);
		assertEquals(AlertType.ERROR.name(), message.getType());
	}

	// ItemLockedException
	@Test
	public void deleteCategory_ThrowItemLockedExceptionAndSendRollbackEvent_IfItemLocked() throws Exception {

		CategoryCreatedEvent categoryCreatedEvent = CategoryDataUtil.getCategoryCreatedEvent(code + "16");
		kafkaTemplate.send(category_topic, categoryCreatedEvent.getAggregateId(), categoryCreatedEvent);
		blockingQueue.poll(30, TimeUnit.SECONDS);

		RollbackFailedEvent rollbackFailedEvent = new RollbackFailedEvent(CategoryEventTypes.DELETE_FAILED)
				.buildFrom(categoryCreatedEvent);
		kafkaTemplate.send(category_topic, rollbackFailedEvent.getAggregateId(), rollbackFailedEvent);

		Thread.sleep(8000);

		try {
			Whitebox.invokeMethod(categoryCommandHandler, "update", categoryCreatedEvent.getAggregateId(),
					new DeleteCategoryCommand(categoryCreatedEvent.getCategory().getId()));
		} catch (Exception e) {
			assertTrue(e instanceof ItemLockedException);
		}
		Event rollback = (Event) blockingQueue.poll(40, TimeUnit.SECONDS);

		assertNotNull(rollback);
		assertEquals(EventTypes.ROLLBACK, rollback.getType());
		assertEquals(rollbackFailedEvent.getFailEventType(), ((CategoryRollbackEvent) rollback).getFailEventType());

		// LLegó un mensaje de alerta
		Message message = (Message) blockingQueueForAlerts.poll(50, TimeUnit.SECONDS);
		assertNotNull(message);
		assertEquals(AlertType.ERROR.name(), message.getType());
	}

	@Test
	public void prepareRollbackEvent_SendCategoryRollbackEventWithFailEventTypeEqualToDeleteCategory_IfItemIsLocked()
			throws Exception {

		CategoryCreatedEvent categoryCreatedEvent = CategoryDataUtil.getCategoryCreatedEvent(code + "17");
		kafkaTemplate.send(category_topic, categoryCreatedEvent.getAggregateId(), categoryCreatedEvent);
		blockingQueue.poll(30, TimeUnit.SECONDS);

		DeleteCategoryEvent deleteCategoryEvent = CategoryDataUtil.getDeleteEvent(code + "17");
		kafkaTemplate.send(category_topic, deleteCategoryEvent.getAggregateId(), deleteCategoryEvent);

		Thread.sleep(8000);

		PrepareRollbackEvent event = (PrepareRollbackEvent) new CategoryAggregate(null, null)
				.getRollbackEvent(deleteCategoryEvent);

		kafkaTemplate.send(category_topic, event.getAggregateId(), event);

		Event rollback = (Event) blockingQueue.poll(40, TimeUnit.SECONDS);

		assertNotNull(rollback);
		assertEquals(CategoryEventTypes.ROLLBACK, rollback.getType());
		assertEquals(deleteCategoryEvent.getType(), ((CategoryRollbackEvent) rollback).getFailEventType());
		assertEquals(event.getFailEventType(), ((CategoryRollbackEvent) rollback).getFailEventType());
		assertEquals(categoryCreatedEvent.getCategory(), ((CategoryRollbackEvent) rollback).getLastSnapshotItem());
	}

	@Test
	public void prepareRollbackEventAfterRollbackFail_SendCategoryRollbackEventWithFailEventTypeEqualToDeleteCategory_IfItemIsLocked()
			throws Exception {

		CategoryCreatedEvent categoryCreatedEvent = CategoryDataUtil.getCategoryCreatedEvent(code + "18");
		kafkaTemplate.send(category_topic, categoryCreatedEvent.getAggregateId(), categoryCreatedEvent);
		blockingQueue.poll(30, TimeUnit.SECONDS);

		DeleteCategoryEvent deleteCategoryEvent = CategoryDataUtil.getDeleteEvent(code + "18");
		kafkaTemplate.send(category_topic, deleteCategoryEvent.getAggregateId(), deleteCategoryEvent);

		RollbackFailedEvent rollbackFailedEvent = new RollbackFailedEvent(CategoryEventTypes.DELETE_FAILED)
				.buildFrom(deleteCategoryEvent);
		kafkaTemplate.send(category_topic, rollbackFailedEvent.getAggregateId(), rollbackFailedEvent);

		Thread.sleep(8000);

		PrepareRollbackEvent event = (PrepareRollbackEvent) new CategoryAggregate(null, null)
				.getRollbackEvent(rollbackFailedEvent);

		try {
			kafkaTemplate.send(category_topic, event.getAggregateId(), event);

		} catch (Exception e) {
			assertTrue(e instanceof ItemLockedException);
		}
		Event rollback = (Event) blockingQueue.poll(60, TimeUnit.SECONDS);
		assertNotNull(rollback);
		assertEquals(CategoryEventTypes.ROLLBACK, rollback.getType());
		assertEquals(event.getFailEventType(), ((CategoryRollbackEvent) rollback).getFailEventType());
@@ -400,6 +747,11 @@ public class CategoryCommandHandlerTest extends KafkaBaseIntegrationTest {
		blockingQueue.offer(categoryRollbackEvent);
	}

	@KafkaListener(topics = "${broker.topic.alert}", groupId = "test")
	public void errorAlert(Message message) {
		blockingQueueForAlerts.offer(message);
	}

	@KafkaHandler(isDefault = true)
	public void defaultEvent(Object def) {

+532 −11

File changed.

Preview size limit exceeded, changes collapsed.

+370 −8

File changed.

Preview size limit exceeded, changes collapsed.

+1 −1
Original line number Diff line number Diff line
@@ -42,7 +42,7 @@ import es.redmic.test.atlascommands.integration.KafkaEmbeddedConfig;
@DirtiesContext
@KafkaListener(topics = "${broker.topic.settings}", groupId = "SettingsCommandHandlerTests")
@TestPropertySource(properties = { "spring.kafka.consumer.group-id=SettingsCommandHandler",
		"schema.registry.port=19199" })
		"schema.registry.port=19199", "rest.eventsource.timeout.ms=20000" })
public class SettingsCommandHandlerTests extends SettingsCommandHandlerBase {

	@ClassRule