Commit d2154252 authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade métodos para enviar evento rollback adecuado

Añade tests que comprueba que al enviar los eventos adecuados, el stream
los procesa y genera el rollback
parent 7fc2dd4a
Loading
Loading
Loading
Loading
+9 −1
Original line number Diff line number Diff line
@@ -23,6 +23,7 @@ import java.util.Arrays;
 */

import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

import es.redmic.atlaslib.dto.category.CategoryDTO;
import es.redmic.atlaslib.events.category.CategoryEventFactory;
@@ -176,6 +177,13 @@ public class CategoryEventStreams extends EventSourcingStreams {
	}

	@Override
	protected void processExtraStreams(KStream<String, Event> events, KStream<String, Event> snapshotEvents) {
	protected void processExtraStreams(KStream<String, Event> events, KTable<String, Event> successEventsTable) {
	}

	@Override
	protected Event getRollbackEvent(Event prepareRollbackEvent, Event lastSuccessEvent) {

		return CategoryEventFactory.getEvent(prepareRollbackEvent, CategoryEventTypes.ROLLBACK,
				((CategoryEvent) lastSuccessEvent).getCategory());
	}
}
+10 −5
Original line number Diff line number Diff line
@@ -324,11 +324,11 @@ public class LayerEventStreams extends EventSourcingStreams {
	}

	@Override
	protected void processExtraStreams(KStream<String, Event> events, KStream<String, Event> snapshotEvents) {
	protected void processExtraStreams(KStream<String, Event> events, KTable<String, Event> successEventsTable) {

		processRefreshSuccessStream(events);

		processRefreshFailedStream(events, snapshotEvents);
		processRefreshFailedStream(events, successEventsTable);
	}

	protected void processRefreshSuccessStream(KStream<String, Event> events) {
@@ -377,14 +377,12 @@ public class LayerEventStreams extends EventSourcingStreams {
	 * refrescar, envía evento de cancelación
	 */

	protected void processRefreshFailedStream(KStream<String, Event> events, KStream<String, Event> successEvents) {
	protected void processRefreshFailedStream(KStream<String, Event> events, KTable<String, Event> successEventsTable) {

		// Stream filtrado por eventos de fallo al modificar
		KStream<String, Event> failedEvents = events
				.filter((id, event) -> (LayerEventTypes.REFRESH_FAILED.equals(event.getType())));

		KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de compensación
		failedEvents
				.join(successEventsTable,
@@ -417,4 +415,11 @@ public class LayerEventStreams extends EventSourcingStreams {

		return themeInspireDTO != null ? themeInspireDTO.getId() : null;
	}

	@Override
	protected Event getRollbackEvent(Event prepareRollbackEvent, Event lastSuccessEvent) {

		return LayerEventFactory.getEvent(prepareRollbackEvent, LayerEventTypes.ROLLBACK,
				((LayerEvent) lastSuccessEvent).getLayer());
	}
}
+9 −1
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@ package es.redmic.atlascommands.streams;
 */

import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

import es.redmic.atlaslib.dto.themeinspire.ThemeInspireDTO;
import es.redmic.atlaslib.events.themeinspire.ThemeInspireEventFactory;
@@ -165,6 +166,13 @@ public class ThemeInspireEventStreams extends EventSourcingStreams {
	}

	@Override
	protected void processExtraStreams(KStream<String, Event> events, KStream<String, Event> snapshotEvents) {
	protected void processExtraStreams(KStream<String, Event> events, KTable<String, Event> successEventsTable) {
	}

	@Override
	protected Event getRollbackEvent(Event prepareRollbackEvent, Event lastSuccessEvent) {

		return ThemeInspireEventFactory.getEvent(prepareRollbackEvent, ThemeInspireEventTypes.ROLLBACK,
				((ThemeInspireEvent) lastSuccessEvent).getThemeInspire());
	}
}
+31 −1
Original line number Diff line number Diff line
@@ -68,6 +68,7 @@ import es.redmic.atlaslib.events.category.delete.DeleteCategoryCheckFailedEvent;
import es.redmic.atlaslib.events.category.delete.DeleteCategoryCheckedEvent;
import es.redmic.atlaslib.events.category.delete.DeleteCategoryConfirmedEvent;
import es.redmic.atlaslib.events.category.delete.DeleteCategoryFailedEvent;
import es.redmic.atlaslib.events.category.fail.CategoryRollbackEvent;
import es.redmic.atlaslib.events.category.update.CategoryUpdatedEvent;
import es.redmic.atlaslib.events.category.update.UpdateCategoryCancelledEvent;
import es.redmic.atlaslib.events.category.update.UpdateCategoryConfirmedEvent;
@@ -75,6 +76,7 @@ import es.redmic.atlaslib.events.category.update.UpdateCategoryEvent;
import es.redmic.atlaslib.events.category.update.UpdateCategoryFailedEvent;
import es.redmic.atlaslib.unit.utils.CategoryDataUtil;
import es.redmic.brokerlib.avro.common.Event;
import es.redmic.brokerlib.avro.fail.PrepareRollbackEvent;
import es.redmic.brokerlib.listener.SendListener;
import es.redmic.exception.data.DeleteItemException;
import es.redmic.exception.data.ItemAlreadyExistException;
@@ -88,7 +90,7 @@ import es.redmic.testutils.kafka.KafkaBaseIntegrationTest;
@DirtiesContext
@KafkaListener(topics = "${broker.topic.category}", groupId = "CategoryCommandHandlerTest")
@TestPropertySource(properties = { "spring.kafka.consumer.group-id=CategoryCommandHandler",
		"schema.registry.port=19099" })
		"schema.registry.port=19099", "rest.eventsource.timeout.ms=20000" })
public class CategoryCommandHandlerTest extends KafkaBaseIntegrationTest {

	protected static Logger logger = LogManager.getLogger();
@@ -320,6 +322,28 @@ public class CategoryCommandHandlerTest extends KafkaBaseIntegrationTest {
		assertEquals(categoryUpdateEvent.getCategory(), ((DeleteCategoryCancelledEvent) confirm).getCategory());
	}

	// Envía un evento de error de prepare rollback y debe provocar un evento
	// CategoryRollback con el item dentro
	@Test
	public void prepareRollbackEvent_SendCategoryRollbackEvent_IfReceivesSuccess() throws Exception {

		// Envía created para meterlo en el stream y lo saca de la cola
		CategoryCreatedEvent categoryCreatedEvent = CategoryDataUtil.getCategoryCreatedEvent(code + "7");
		kafkaTemplate.send(category_topic, categoryCreatedEvent.getAggregateId(), categoryCreatedEvent);
		blockingQueue.poll(10, TimeUnit.SECONDS);

		PrepareRollbackEvent event = CategoryDataUtil.getPrepareRollbackEvent(code + "7");

		kafkaTemplate.send(category_topic, event.getAggregateId(), event);

		Event rollback = (Event) blockingQueue.poll(30, TimeUnit.SECONDS);

		assertNotNull(rollback);
		assertEquals(CategoryEventTypes.ROLLBACK, rollback.getType());
		assertEquals(event.getFailEventType(), ((CategoryRollbackEvent) rollback).getFailEventType());
		assertEquals(categoryCreatedEvent.getCategory(), ((CategoryRollbackEvent) rollback).getLastSnapshotItem());
	}

	@KafkaHandler
	public void categoryCreatedEvent(CategoryCreatedEvent categoryCreatedEvent) {

@@ -368,6 +392,12 @@ public class CategoryCommandHandlerTest extends KafkaBaseIntegrationTest {
		blockingQueue.offer(deleteCategoryCheckFailedEvent);
	}

	@KafkaHandler
	public void categoryRollbackEvent(CategoryRollbackEvent categoryRollbackEvent) {

		blockingQueue.offer(categoryRollbackEvent);
	}

	@KafkaHandler(isDefault = true)
	public void defaultEvent(Object def) {

+35 −0
Original line number Diff line number Diff line
@@ -71,6 +71,7 @@ import es.redmic.atlaslib.events.layer.delete.DeleteLayerCheckedEvent;
import es.redmic.atlaslib.events.layer.delete.DeleteLayerConfirmedEvent;
import es.redmic.atlaslib.events.layer.delete.DeleteLayerFailedEvent;
import es.redmic.atlaslib.events.layer.delete.LayerDeletedEvent;
import es.redmic.atlaslib.events.layer.fail.LayerRollbackEvent;
import es.redmic.atlaslib.events.layer.refresh.LayerRefreshedEvent;
import es.redmic.atlaslib.events.layer.refresh.RefreshLayerCancelledEvent;
import es.redmic.atlaslib.events.layer.refresh.RefreshLayerConfirmedEvent;
@@ -87,6 +88,7 @@ import es.redmic.atlaslib.events.themeinspire.update.ThemeInspireUpdatedEvent;
import es.redmic.atlaslib.unit.utils.LayerDataUtil;
import es.redmic.atlaslib.unit.utils.ThemeInspireDataUtil;
import es.redmic.brokerlib.avro.common.Event;
import es.redmic.brokerlib.avro.fail.PrepareRollbackEvent;
import es.redmic.brokerlib.listener.SendListener;
import es.redmic.exception.data.DeleteItemException;
import es.redmic.exception.data.ItemAlreadyExistException;
@@ -477,6 +479,33 @@ public class LayerCommandHandlerTest extends KafkaBaseIntegrationTest {
		assertEquals(layerUpdateEvent.getLayer(), ((DeleteLayerCancelledEvent) confirm).getLayer());
	}

	// Envía un evento de error de prepare rollback y debe provocar un evento
	// LayerRollback con el item dentro
	@Test
	public void prepareRollbackEvent_SendLayerRollbackEvent_IfReceivesSuccess() throws Exception {

		// Envía created para meterlo en el stream y lo saca de la cola
		LayerCreatedEvent layerCreatedEvent = LayerDataUtil.getLayerCreatedEvent(code + "7");
		kafkaTemplate.send(layer_topic, layerCreatedEvent.getAggregateId(), layerCreatedEvent);
		blockingQueue.poll(30, TimeUnit.SECONDS);

		Thread.sleep(8000);

		PrepareRollbackEvent event = LayerDataUtil.getPrepareRollbackEvent(code + "7");

		kafkaTemplate.send(layer_topic, event.getAggregateId(), event);

		Event rollback = (Event) blockingQueue.poll(40, TimeUnit.SECONDS);

		assertNotNull(rollback);
		assertEquals(LayerEventTypes.ROLLBACK, rollback.getType());
		assertEquals(event.getFailEventType(), ((LayerRollbackEvent) rollback).getFailEventType());
		assertEquals(layerCreatedEvent.getLayer().getId(),
				((LayerRollbackEvent) rollback).getLastSnapshotItem().getId());
		assertEquals(layerCreatedEvent.getLayer().getUpdated().getMillis(),
				((LayerRollbackEvent) rollback).getLastSnapshotItem().getUpdated().getMillis());
	}

	@KafkaHandler
	public void createLayerEvent(CreateLayerEvent createLayerEvent) {

@@ -543,6 +572,12 @@ public class LayerCommandHandlerTest extends KafkaBaseIntegrationTest {
		blockingQueue.offer(deleteLayerCheckFailedEvent);
	}

	@KafkaHandler
	public void layerRollbackEvent(LayerRollbackEvent layerRollbackEvent) {

		blockingQueue.offer(layerRollbackEvent);
	}

	@KafkaHandler(isDefault = true)
	public void defaultEvent(Object def) {

Loading