Commit 51e745cb authored by Noel Alonso's avatar Noel Alonso
Browse files

Elimina la resolución de command en timeseries

Si se trata de un proceso automático, no se resuelve el comando para
enviar una respuesta, ya que la inserción se realiza vía stream no
command.
parent 15d3e9d7
Loading
Loading
Loading
Loading
+21 −16
Original line number Diff line number Diff line
@@ -77,9 +77,6 @@ public class VesselCommandHandler extends CommandHandler {
	@Value("${stream.windows.time.ms}")
	private Long streamWindowsTime;

	@Value("${process.eventsource.timeout.ms}")
	private long processTimeoutMS;

	private final String REDMIC_PROCESS = "REDMIC_PROCESS";

	private VesselStateStore vesselStateStore;
@@ -143,10 +140,6 @@ public class VesselCommandHandler extends CommandHandler {
		// Emite evento para enviar a kafka
		publishToKafka(event, vesselTopic);

		// Se resuelve con un timeout mayor, establecido para procesos automáticos
		if (event.getUserId().equals(REDMIC_PROCESS))
			return getResult(processTimeoutMS, event.getSessionId(), completableFuture);

		// Obtiene el resultado cuando se resuelva la espera
		return getResult(event.getSessionId(), completableFuture);
	}
@@ -217,8 +210,10 @@ public class VesselCommandHandler extends CommandHandler {

		// El evento Creado se envió desde el stream

		if (!event.getUserId().equals(REDMIC_PROCESS)) {
			resolveCommand(event.getSessionId());
		}
	}

	@KafkaHandler
	private void listen(UpdateVesselEnrichedEvent event) {
@@ -237,8 +232,10 @@ public class VesselCommandHandler extends CommandHandler {

		// El evento Modificado se envió desde el stream

		if (!event.getUserId().equals(REDMIC_PROCESS)) {
			resolveCommand(event.getSessionId());
		}
	}

	@KafkaHandler
	private void listen(DeleteVesselCheckedEvent event) {
@@ -257,8 +254,10 @@ public class VesselCommandHandler extends CommandHandler {

		logger.debug("Vessel eliminado " + event.getAggregateId());

		if (!event.getUserId().equals(REDMIC_PROCESS)) {
			resolveCommand(event.getSessionId());
		}
	}

	@KafkaHandler
	private void listen(CreateVesselFailedEvent event) {
@@ -272,9 +271,11 @@ public class VesselCommandHandler extends CommandHandler {

		logger.debug("Error creando Vessel " + event.getAggregateId());

		if (!event.getUserId().equals(REDMIC_PROCESS)) {
			resolveCommand(event.getSessionId(),
					ExceptionFactory.getException(event.getExceptionType(), event.getArguments()));
		}
	}

	@KafkaHandler
	private void listen(UpdateVesselCancelledEvent event) {
@@ -283,9 +284,11 @@ public class VesselCommandHandler extends CommandHandler {

		// El evento Cancelled se envía desde el stream

		if (!event.getUserId().equals(REDMIC_PROCESS)) {
			resolveCommand(event.getSessionId(),
					ExceptionFactory.getException(event.getExceptionType(), event.getArguments()));
		}
	}

	@KafkaHandler
	private void listen(DeleteVesselCheckFailedEvent event) {
@@ -301,7 +304,9 @@ public class VesselCommandHandler extends CommandHandler {

		// El evento Cancelled se envía desde el stream

		if (!event.getUserId().equals(REDMIC_PROCESS)) {
			resolveCommand(event.getSessionId(),
					ExceptionFactory.getException(event.getExceptionType(), event.getArguments()));
		}
	}
}
+21 −16
Original line number Diff line number Diff line
@@ -69,9 +69,6 @@ public class VesselTrackingCommandHandler extends CommandHandler {
	@Value("${stream.windows.time.ms}")
	private Long streamWindowsTime;

	@Value("${process.eventsource.timeout.ms}")
	private long processTimeoutMS;

	private final String REDMIC_PROCESS = "REDMIC_PROCESS";

	private VesselTrackingStateStore vesselTrackingStateStore;
@@ -135,10 +132,6 @@ public class VesselTrackingCommandHandler extends CommandHandler {
		// Emite evento para enviar a kafka
		publishToKafka(event, vesselTrackingTopic);

		// Se resuelve con un timeout mayor, establecido para procesos automáticos
		if (event.getUserId().equals(REDMIC_PROCESS))
			return getResult(processTimeoutMS, event.getSessionId(), completableFuture);

		// Obtiene el resultado cuando se resuelva la espera
		return getResult(event.getSessionId(), completableFuture);
	}
@@ -213,8 +206,10 @@ public class VesselTrackingCommandHandler extends CommandHandler {

		// El evento Creado se envió desde el stream

		if (!event.getUserId().equals(REDMIC_PROCESS)) {
			resolveCommand(event.getSessionId());
		}
	}

	@KafkaHandler
	private void listen(UpdateVesselTrackingEnrichedEvent event) {
@@ -231,8 +226,10 @@ public class VesselTrackingCommandHandler extends CommandHandler {

		// El evento Modificado se envió desde el stream

		if (!event.getUserId().equals(REDMIC_PROCESS)) {
			resolveCommand(event.getSessionId());
		}
	}

	@KafkaHandler
	private void listen(DeleteVesselTrackingConfirmedEvent event) {
@@ -246,8 +243,10 @@ public class VesselTrackingCommandHandler extends CommandHandler {

		logger.debug("VesselTracking eliminado " + event.getAggregateId());

		if (!event.getUserId().equals(REDMIC_PROCESS)) {
			resolveCommand(event.getSessionId());
		}
	}

	@KafkaHandler
	private void listen(CreateVesselTrackingFailedEvent event) {
@@ -261,9 +260,11 @@ public class VesselTrackingCommandHandler extends CommandHandler {

		logger.debug("Error creando VesselTracking " + event.getAggregateId());

		if (!event.getUserId().equals(REDMIC_PROCESS)) {
			resolveCommand(event.getSessionId(),
					ExceptionFactory.getException(event.getExceptionType(), event.getArguments()));
		}
	}

	@KafkaHandler
	private void listen(UpdateVesselTrackingCancelledEvent event) {
@@ -272,9 +273,11 @@ public class VesselTrackingCommandHandler extends CommandHandler {

		// El evento Cancelled se envía desde el stream

		if (!event.getUserId().equals(REDMIC_PROCESS)) {
			resolveCommand(event.getSessionId(),
					ExceptionFactory.getException(event.getExceptionType(), event.getArguments()));
		}
	}

	@KafkaHandler
	private void listen(DeleteVesselTrackingCancelledEvent event) {
@@ -283,7 +286,9 @@ public class VesselTrackingCommandHandler extends CommandHandler {

		// El evento Cancelled se envía desde el stream

		if (!event.getUserId().equals(REDMIC_PROCESS)) {
			resolveCommand(event.getSessionId(),
					ExceptionFactory.getException(event.getExceptionType(), event.getArguments()));
		}
	}
}
+0 −1
Original line number Diff line number Diff line
@@ -47,7 +47,6 @@ spring.kafka.consumer.heartbeat-interval.seconds=100
spring.kafka.consumer.properties.session.timeout.ms=300000

rest.eventsource.timeout.ms=300000
process.eventsource.timeout.ms=2400000
stream.windows.time.ms=2400000

spring.kafka.properties.specific.avro.reader=true