Commit 4b84eb83 authored by Noel Alonso's avatar Noel Alonso
Browse files

Pasa nuevos parámetros a streams

Pasa nombre del topic de agregación a VesselEventStreams
Cambia referencias a clases reubicadas
parent 0d7e0082
Loading
Loading
Loading
Loading
+27 −25
Original line number Diff line number Diff line
@@ -12,7 +12,8 @@ import org.springframework.stereotype.Component;

import es.redmic.brokerlib.alert.AlertService;
import es.redmic.commandslib.commands.CommandHandler;
import es.redmic.commandslib.statestore.StreamConfig;
import es.redmic.commandslib.streaming.common.StreamConfig;
import es.redmic.commandslib.streaming.common.StreamConfig.Builder;
import es.redmic.exception.factory.ExceptionFactory;
import es.redmic.vesselscommands.aggregate.VesselAggregate;
import es.redmic.vesselscommands.config.UserService;
@@ -42,19 +43,22 @@ public class VesselCommandHandler extends CommandHandler {
	private String bootstrapServers;

	@Value("${broker.topic.vessel}")
	private String vessel_topic;
	private String vesselTopic;

	@Value("${broker.topic.vessels.agg.by.vesseltype}")
	private String vesselsAggByVesselTypeTopic;

	@Value("${broker.state.store.vessels.dir}")
	private String state_store_vessels_dir;
	private String stateStoreVesselsDir;

	@Value("${broker.state.store.vessels.id}")
	private String vessels_id_config;
	private String vesselsIdConfig;

	@Value("${broker.stream.events.vessels.id}")
	private String vessels_events_stream_id;
	private String vesselsEventsStreamId;

	@Value("${broker.topic.vessel-type}")
	private String vessel_type_topic;
	private String vesselTypeTopic;

	@Value("${stream.windows.time.ms}")
	private Long streamWindowsTime;
@@ -83,24 +87,22 @@ public class VesselCommandHandler extends CommandHandler {

		// @formatter:off
		
		vesselStateStore = new VesselStateStore(
				StreamConfig.Builder
		Builder config = StreamConfig.Builder
			.bootstrapServers(bootstrapServers)
			.schemaRegistry(schemaRegistry)
					.serviceId(vessels_id_config)
					.stateStoreDir(state_store_vessels_dir)
					.topic(vessel_topic)
			.stateStoreDir(stateStoreVesselsDir)
			.topic(vesselTopic);
		
		vesselStateStore = new VesselStateStore(
				config
					.serviceId(vesselsIdConfig)
					.build(), alertService);

		new VesselEventStreams(
				StreamConfig.Builder
					.bootstrapServers(bootstrapServers)
					.schemaRegistry(schemaRegistry)
					.serviceId(vessels_events_stream_id)
					.stateStoreDir(state_store_vessels_dir)
					.topic(vessel_topic)
				config
					.serviceId(vesselsEventsStreamId)
					.windowsTime(streamWindowsTime)
					.build(), vessel_type_topic, alertService);
					.build(), vesselTypeTopic, vesselsAggByVesselTypeTopic, alertService);
		// @formatter:on
	}

@@ -133,7 +135,7 @@ public class VesselCommandHandler extends CommandHandler {
		CompletableFuture<VesselDTO> completableFuture = getCompletableFeature(event.getSessionId(), agg.getVessel());

		// Emite evento para enviar a kafka
		publishToKafka(event, vessel_topic);
		publishToKafka(event, vesselTopic);

		// Se resuelve con un timeout mayor, establecido para procesos automáticos
		if (event.getUserId().equals(REDMIC_PROCESS))
@@ -166,7 +168,7 @@ public class VesselCommandHandler extends CommandHandler {
		CompletableFuture<VesselDTO> completableFuture = getCompletableFeature(event.getSessionId(), agg.getVessel());

		// Emite evento para enviar a kafka
		publishToKafka(event, vessel_topic);
		publishToKafka(event, vesselTopic);

		// Obtiene el resultado cuando se resuelva la espera
		return getResult(event.getSessionId(), completableFuture);
@@ -193,7 +195,7 @@ public class VesselCommandHandler extends CommandHandler {
		CompletableFuture<VesselDTO> completableFuture = getCompletableFeature(event.getSessionId(), agg.getVessel());

		// Emite evento para enviar a kafka
		publishToKafka(event, vessel_topic);
		publishToKafka(event, vesselTopic);

		// Obtiene el resultado cuando se resuelva la espera
		return getResult(event.getSessionId(), completableFuture);
@@ -231,7 +233,7 @@ public class VesselCommandHandler extends CommandHandler {

		logger.info("Enviando evento VesselDeletedEvent para: " + event.getAggregateId());

		publishToKafka(new VesselDeletedEvent().buildFrom(event), vessel_topic);
		publishToKafka(new VesselDeletedEvent().buildFrom(event), vesselTopic);
	}

	@KafkaHandler
@@ -251,7 +253,7 @@ public class VesselCommandHandler extends CommandHandler {
		evt.setExceptionType(event.getExceptionType());
		evt.setArguments(event.getArguments());

		publishToKafka(evt, vessel_topic);
		publishToKafka(evt, vesselTopic);
	}

	@KafkaHandler
+24 −24
Original line number Diff line number Diff line
@@ -12,7 +12,8 @@ import org.springframework.stereotype.Component;

import es.redmic.brokerlib.alert.AlertService;
import es.redmic.commandslib.commands.CommandHandler;
import es.redmic.commandslib.statestore.StreamConfig;
import es.redmic.commandslib.streaming.common.StreamConfig;
import es.redmic.commandslib.streaming.common.StreamConfig.Builder;
import es.redmic.exception.factory.ExceptionFactory;
import es.redmic.vesselscommands.aggregate.VesselTypeAggregate;
import es.redmic.vesselscommands.config.UserService;
@@ -42,16 +43,16 @@ public class VesselTypeCommandHandler extends CommandHandler {
	protected String bootstrapServers;

	@Value("${broker.topic.vessel-type}")
	private String vessel_type_topic;
	private String vesselTypeTopic;

	@Value("${broker.state.store.vesseltypes.dir}")
	private String state_store_vesseltypes_dir;
	private String stateStoreVesseltypesDir;

	@Value("${broker.state.store.vesseltypes.id}")
	private String vesseltypes_id_config;
	private String vesseltypesIdConfig;

	@Value("${broker.stream.events.vesseltypes.id}")
	private String vesseltypes_events_stream_id;
	private String vesseltypesEventsStreamId;

	@Value("${stream.windows.time.ms}")
	private Long streamWindowsTime;
@@ -73,21 +74,20 @@ public class VesselTypeCommandHandler extends CommandHandler {

		// @formatter:off
		
		vesselTypeStateStore = new VesselTypeStateStore(
				StreamConfig.Builder
		Builder config = StreamConfig.Builder
				.bootstrapServers(bootstrapServers)
				.schemaRegistry(schemaRegistry)
					.serviceId(vesseltypes_id_config)
					.stateStoreDir(state_store_vesseltypes_dir)
					.topic(vessel_type_topic)
				.stateStoreDir(stateStoreVesseltypesDir)
				.topic(vesselTypeTopic);
		
		vesselTypeStateStore = new VesselTypeStateStore(
				config
					.serviceId(vesseltypesIdConfig)
					.build(), alertService);

		new VesselTypeEventStreams(StreamConfig.Builder
				.bootstrapServers(bootstrapServers)
				.schemaRegistry(schemaRegistry)
				.serviceId(vesseltypes_events_stream_id)
				.stateStoreDir(state_store_vesseltypes_dir)
				.topic(vessel_type_topic)
		new VesselTypeEventStreams(
				config
					.serviceId(vesseltypesEventsStreamId)
					.windowsTime(streamWindowsTime)
					.build(), alertService);
		
@@ -119,7 +119,7 @@ public class VesselTypeCommandHandler extends CommandHandler {
				agg.getVesselType());

		// Emite evento para enviar a kafka
		publishToKafka(event, vessel_type_topic);
		publishToKafka(event, vesselTypeTopic);

		// Obtiene el resultado cuando se resuelva la espera
		return getResult(event.getSessionId(), completableFuture);
@@ -146,7 +146,7 @@ public class VesselTypeCommandHandler extends CommandHandler {
				agg.getVesselType());

		// Emite evento para enviar a kafka
		publishToKafka(event, vessel_type_topic);
		publishToKafka(event, vesselTypeTopic);

		// Obtiene el resultado cuando se resuelva la espera
		return getResult(event.getSessionId(), completableFuture);
@@ -174,7 +174,7 @@ public class VesselTypeCommandHandler extends CommandHandler {
				agg.getVesselType());

		// Emite evento para enviar a kafka
		publishToKafka(event, vessel_type_topic);
		publishToKafka(event, vesselTypeTopic);

		// Obtiene el resultado cuando se resuelva la espera
		return getResult(event.getSessionId(), completableFuture);
@@ -212,7 +212,7 @@ public class VesselTypeCommandHandler extends CommandHandler {

		logger.info("Enviando evento VesselTypeDeletedEvent para: " + event.getAggregateId());

		publishToKafka(new VesselTypeDeletedEvent().buildFrom(event), vessel_type_topic);
		publishToKafka(new VesselTypeDeletedEvent().buildFrom(event), vesselTypeTopic);
	}

	@KafkaHandler
@@ -232,7 +232,7 @@ public class VesselTypeCommandHandler extends CommandHandler {
		evt.setExceptionType(event.getExceptionType());
		evt.setArguments(event.getArguments());

		publishToKafka(evt, vessel_type_topic);
		publishToKafka(evt, vesselTypeTopic);
	}

	@KafkaHandler