Commit b9060efb authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade stream para crear tracking de vessel

En lugar de hacerlo vía command, se hace un join entre los eventos de
vesselTracking y los enviados de ais. Solo si no se ha procesado antes se
envía a crear.

Adapta test
parent 831c10d0
Loading
Loading
Loading
Loading
+4 −1
Original line number Diff line number Diff line
@@ -63,6 +63,9 @@ public class VesselTrackingCommandHandler extends CommandHandler {
	@Value("${broker.topic.vessel}")
	private String vesselTopic;

	@Value("${broker.topic.realtime.tracking.vessels}")
	private String realtimeTrackingVesselsTopic;

	@Value("${stream.windows.time.ms}")
	private Long streamWindowsTime;

@@ -102,7 +105,7 @@ public class VesselTrackingCommandHandler extends CommandHandler {
				config
					.serviceId(vesselTrackingEventsStreamId)
					.windowsTime(streamWindowsTime)
					.build(), vesselTopic, alertService);
					.build(), vesselTopic, realtimeTrackingVesselsTopic, alertService);
		// @formatter:on
	}

+49 −2
Original line number Diff line number Diff line
package es.redmic.vesselscommands.streams;

import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.joda.time.DateTime;

import es.redmic.brokerlib.alert.AlertService;
import es.redmic.brokerlib.avro.common.Event;
@@ -16,18 +18,26 @@ import es.redmic.vesselslib.events.vesseltracking.VesselTrackingEventFactory;
import es.redmic.vesselslib.events.vesseltracking.VesselTrackingEventTypes;
import es.redmic.vesselslib.events.vesseltracking.common.VesselTrackingEvent;
import es.redmic.vesselslib.events.vesseltracking.create.CreateVesselTrackingEnrichedEvent;
import es.redmic.vesselslib.events.vesseltracking.create.CreateVesselTrackingEvent;
import es.redmic.vesselslib.events.vesseltracking.update.UpdateVesselTrackingEnrichedEvent;

public class VesselTrackingEventStreams extends EventSourcingStreams {

	private String vesselTopic;

	private String realtimeTrackingVesselsTopic;

	private GlobalKTable<String, Event> vessel;

	public VesselTrackingEventStreams(StreamConfig config, String vesselTopic, AlertService alertService) {
	private KStream<String, VesselTrackingDTO> realtimeTracking;

	private final String REDMIC_PROCESS = "REDMIC_PROCESS";

	public VesselTrackingEventStreams(StreamConfig config, String vesselTopic, String realtimeTrackingVesselsTopic,
			AlertService alertService) {
		super(config, alertService);
		this.vesselTopic = vesselTopic + snapshotTopicSuffix;

		this.realtimeTrackingVesselsTopic = realtimeTrackingVesselsTopic;
		logger.info("Arrancado servicio de streaming para event sourcing de Vessel tracking con Id: " + this.serviceId);
		init();
	}
@@ -41,6 +51,8 @@ public class VesselTrackingEventStreams extends EventSourcingStreams {
	protected void createExtraStreams() {

		vessel = builder.globalTable(vesselTopic);

		realtimeTracking = builder.stream(realtimeTrackingVesselsTopic);
	}

	/**
@@ -241,4 +253,39 @@ public class VesselTrackingEventStreams extends EventSourcingStreams {
	protected void processPostUpdateStream(KStream<String, Event> events) {
		// En series temporales no se hace postUpdate
	}

	@Override
	protected void processExtraStreams(KStream<String, Event> events) {

		createTrackingFromRealtimeTrackingVessel(realtimeTracking, events);
	}

	private void createTrackingFromRealtimeTrackingVessel(KStream<String, VesselTrackingDTO> realTimeTracking,
			KStream<String, Event> events) {

		realTimeTracking
				.leftJoin(events,
						(vesselTrackingDTO, vesselTrackingEvent) -> getCreateTrackingFromRealtimeTrackingVessel(
								vesselTrackingDTO, vesselTrackingEvent),
						JoinWindows.of(windowsTime))
				.filter((k, v) -> (v != null)).to(topic);
	}

	private Event getCreateTrackingFromRealtimeTrackingVessel(VesselTrackingDTO vesselTrackingDTO,
			Event vesselTrackingEvent) {

		if (vesselTrackingEvent == null) {

			vesselTrackingDTO.getProperties().setInserted(DateTime.now());
			vesselTrackingDTO.getProperties().setUpdated(DateTime.now());

			CreateVesselTrackingEvent evt = new CreateVesselTrackingEvent(vesselTrackingDTO);
			evt.setAggregateId(vesselTrackingDTO.getId());
			evt.setVersion(1);
			evt.setUserId(REDMIC_PROCESS);

			return evt;
		}
		return null;
	}
}
+7 −15
Original line number Diff line number Diff line
@@ -14,12 +14,12 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.reflect.Whitebox;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ActiveProfiles;
@@ -30,8 +30,6 @@ import es.redmic.brokerlib.avro.common.Event;
import es.redmic.test.vesselscommands.integration.KafkaEmbeddedConfig;
import es.redmic.testutils.kafka.KafkaBaseIntegrationTest;
import es.redmic.vesselscommands.VesselsCommandsApplication;
import es.redmic.vesselscommands.handler.VesselTrackingCommandHandler;
import es.redmic.vesselscommands.service.VesselTrackingCommandService;
import es.redmic.vesselslib.dto.tracking.VesselTrackingDTO;
import es.redmic.vesselslib.dto.vessel.VesselDTO;
import es.redmic.vesselslib.events.vesseltracking.create.CreateVesselTrackingEvent;
@@ -48,9 +46,6 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest {
	@Value("${broker.topic.realtime.tracking.vessels}")
	String REALTIME_TRACKING_VESSELS_TOPIC;

	@Value("${broker.topic.vessel}")
	String VESSEL_TOPIC;

	private Integer mmsi = 1;

	private String tstamp = "343232132";
@@ -63,10 +58,7 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest {
			KafkaEmbeddedConfig.PARTITIONS_PER_TOPIC, KafkaEmbeddedConfig.TOPICS_NAME);

	@Autowired
	VesselTrackingCommandService service;

	@Autowired
	VesselTrackingCommandHandler vesselTrackingCommandHandler;
	private KafkaTemplate<String, VesselTrackingDTO> kafkaTemplate;

	protected static BlockingQueue<Object> blockingQueue;

@@ -87,8 +79,11 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest {

		VesselTrackingDTO source = VesselTrackingDataUtil.getCreateEvent(mmsi, tstamp).getVesselTracking();

		// LLama directamente al servicio para evitar pasar por vessel
		service.create(source, activityId);
		kafkaTemplate.send(REALTIME_TRACKING_VESSELS_TOPIC, source.getId(), source);

		Thread.sleep(10000);

		kafkaTemplate.send(REALTIME_TRACKING_VESSELS_TOPIC, source.getId(), source);

		VesselTrackingDTO vesselTracking = (VesselTrackingDTO) blockingQueue.poll(4, TimeUnit.MINUTES);
		assertNotNull(vesselTracking);
@@ -112,9 +107,6 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest {
	@KafkaHandler
	public void listen(CreateVesselTrackingEvent createVesselTrackingEvent) throws Exception {

		// Resuelve la espera para que siga la ejecución
		Whitebox.invokeMethod(vesselTrackingCommandHandler, "resolveCommand", createVesselTrackingEvent.getSessionId());

		blockingQueue.offer(createVesselTrackingEvent.getVesselTracking());
	}