Commit b0090321 authored by Noel Alonso's avatar Noel Alonso
Browse files

Divide los streaming y añade assert

Para controlar condiciones básicas para el streaming se añaden assert que en
caso de no cumplirse, producirá un error.
parent 9b165f7d
Loading
Loading
Loading
Loading
+75 −65
Original line number Diff line number Diff line
@@ -44,30 +44,21 @@ public class VesselTypeEventStreams extends EventStreams {

	private Event getCreatedEvent(Event confirmedEvent, Event requestEvent) {

		if (!isSameSession(confirmedEvent, requestEvent)) {
			return null;
		}
		assert requestEvent.getType().equals(VesselTypeEventType.CREATE_VESSELTYPE.name());

		if (!(requestEvent.getType().equals(VesselTypeEventType.CREATE_VESSELTYPE.name()))) {
			logger.error("Se esperaba un evento de petición de tipo CREATE para VesselType.");
		assert confirmedEvent.getType().equals(VesselTypeEventType.CREATE_VESSELTYPE_CONFIRMED.name());

		if (!isSameSession(confirmedEvent, requestEvent)) {
			return null;
		}

		logger.debug("Creando evento de creado exitoso para VesselType");

		VesselTypeDTO vesselType = ((VesselTypeEvent) requestEvent).getVesselType();

		if (confirmedEvent.getType().equals(VesselTypeEventType.CREATE_VESSELTYPE_CONFIRMED.name())) {

		logger.info("Enviando evento VesselTypeCreatedEvent para: " + confirmedEvent.getAggregateId());

		VesselTypeCreatedEvent successfulEvent = new VesselTypeCreatedEvent().buildFrom(confirmedEvent);
		successfulEvent.setVesselType(vesselType);
		return successfulEvent;
		} else {
			logger.error("Se esperaba un evento de confirmación de tipo CREATE para VesselType.");
			return null;
		}
	}

	@Override
@@ -89,12 +80,11 @@ public class VesselTypeEventStreams extends EventStreams {

	private Event getUpdatedEvent(Event confirmedEvent, Event requestEvent) {

		if (!isSameSession(confirmedEvent, requestEvent)) {
			return null;
		}
		assert requestEvent.getType().equals(VesselTypeEventType.UPDATE_VESSELTYPE.name());

		assert confirmedEvent.getType().equals(VesselTypeEventType.UPDATE_VESSELTYPE_CONFIRMED.name());

		if (!(requestEvent.getType().equals(VesselTypeEventType.UPDATE_VESSELTYPE.name()))) {
			logger.error("Se esperaba un evento de petición de UPDATE para VesselType.");
		if (!isSameSession(confirmedEvent, requestEvent)) {
			return null;
		}

@@ -102,57 +92,71 @@ public class VesselTypeEventStreams extends EventStreams {

		VesselTypeDTO vesselType = ((VesselTypeEvent) requestEvent).getVesselType();

		if (confirmedEvent.getType().equals(VesselTypeEventType.UPDATE_VESSELTYPE_CONFIRMED.name())) {

		logger.info("Enviando evento VesselTypeUpdatedEvent para: " + confirmedEvent.getAggregateId());

		VesselTypeUpdatedEvent successfulEvent = new VesselTypeUpdatedEvent().buildFrom(confirmedEvent);
		successfulEvent.setVesselType(vesselType);
		return successfulEvent;
		} else {
			logger.error("Se esperaba un evento de confirmación de tipo UPDATE para VesselType.");
			return null;
		}
	}

	@Override
	protected void processFailedChangeStream(KStream<String, Event> vesselTypeEvents) {

		// Stream filtrado por eventos de fallo al modificar y borrar
		KStream<String, Event> failedEvents = vesselTypeEvents
				.filter((id, event) -> (VesselTypeEventType.UPDATE_VESSELTYPE_FAILED.toString().equals(event.getType())
						|| VesselTypeEventType.DELETE_VESSELTYPE_FAILED.toString().equals(event.getType())));

		// Stream filtrado por eventos de creaciones y modificaciones correctos (solo el
		// último que se produzca por id)

		KStream<String, Event> successEvents = vesselTypeEvents
				.filter((id, event) -> (VesselTypeEventType.VESSELTYPE_CREATED.toString().equals(event.getType())
						|| VesselTypeEventType.VESSELTYPE_UPDATED.toString().equals(event.getType())));

		processUpdateFailedStream(vesselTypeEvents, successEvents);

		processDeleteFailedStream(vesselTypeEvents, successEvents);

	}

	protected void processUpdateFailedStream(KStream<String, Event> vesselTypeEvents,
			KStream<String, Event> successEvents) {

		// Stream filtrado por eventos de fallo al modificar
		KStream<String, Event> failedEvents = vesselTypeEvents.filter(
				(id, event) -> (VesselTypeEventType.UPDATE_VESSELTYPE_FAILED.toString().equals(event.getType())));

		KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de compensación
		failedEvents.join(successEventsTable,
				(failedEvent, lastSuccessEvent) -> getCancelledEvent(failedEvent, lastSuccessEvent)).to(topic);
		failedEvents
				.join(successEventsTable,
						(failedEvent, lastSuccessEvent) -> getUpdateCancelledEvent(failedEvent, lastSuccessEvent))
				.to(topic);
	}

	private Event getCancelledEvent(Event failedEvent, Event lastSuccessEvent) {
	protected void processDeleteFailedStream(KStream<String, Event> vesselTypeEvents,
			KStream<String, Event> successEvents) {

		if (!(lastSuccessEvent.getType().equals(VesselTypeEventType.VESSELTYPE_CREATED.name())
				|| lastSuccessEvent.getType().equals(VesselTypeEventType.VESSELTYPE_UPDATED.name()))) {
			logger.error("Se esperaba un evento satisfactorio de tipo CREATED o UPDATED.");
			return null;
		// Stream filtrado por eventos de fallo al borrar
		KStream<String, Event> failedEvents = vesselTypeEvents.filter(
				(id, event) -> (VesselTypeEventType.DELETE_VESSELTYPE_FAILED.toString().equals(event.getType())));

		KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de compensación
		failedEvents
				.join(successEventsTable,
						(failedEvent, lastSuccessEvent) -> getDeleteCancelledEvent(failedEvent, lastSuccessEvent))
				.to(topic);
	}

		logger.debug("Creando evento de cancelación para VesselType");
	private Event getUpdateCancelledEvent(Event failedEvent, Event lastSuccessEvent) {

		assert failedEvent.getType().equals(VesselTypeEventType.UPDATE_VESSELTYPE_FAILED.name());

		assert lastSuccessEvent.getType().equals(VesselTypeEventType.VESSELTYPE_CREATED.name())
				|| lastSuccessEvent.getType().equals(VesselTypeEventType.VESSELTYPE_UPDATED.name());

		VesselTypeDTO vesselType = ((VesselTypeEvent) lastSuccessEvent).getVesselType();

		EventError eventError = (EventError) failedEvent;

		if (failedEvent.getType().equals(VesselTypeEventType.UPDATE_VESSELTYPE_FAILED.name())) {

		logger.info("Enviando evento UpdateVesselTypeCancelledEvent para: " + failedEvent.getAggregateId());

		UpdateVesselTypeCancelledEvent cancelledEvent = new UpdateVesselTypeCancelledEvent().buildFrom(failedEvent);
@@ -160,8 +164,18 @@ public class VesselTypeEventStreams extends EventStreams {
		cancelledEvent.setExceptionType(eventError.getExceptionType());
		cancelledEvent.setArguments(eventError.getArguments());
		return cancelledEvent;
	}

	private Event getDeleteCancelledEvent(Event failedEvent, Event lastSuccessEvent) {

		assert failedEvent.getType().equals(VesselTypeEventType.DELETE_VESSELTYPE_FAILED.name());

		assert lastSuccessEvent.getType().equals(VesselTypeEventType.VESSELTYPE_CREATED.name())
				|| lastSuccessEvent.getType().equals(VesselTypeEventType.VESSELTYPE_UPDATED.name());

		} else if (failedEvent.getType().equals(VesselTypeEventType.DELETE_VESSELTYPE_FAILED.name())) {
		VesselTypeDTO vesselType = ((VesselTypeEvent) lastSuccessEvent).getVesselType();

		EventError eventError = (EventError) failedEvent;

		logger.info("Enviar evento DeleteVesselTypeCancelledEvent para: " + failedEvent.getAggregateId());

@@ -170,10 +184,6 @@ public class VesselTypeEventStreams extends EventStreams {
		cancelledEvent.setExceptionType(eventError.getExceptionType());
		cancelledEvent.setArguments(eventError.getArguments());
		return cancelledEvent;
		} else {
			logger.error("Se esperaba un evento fallido de tipo UPDATE o DELETE para VesselType.");
			return null;
		}
	}

	@Override