Commit 95fddb83 authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade vessel a partir de datos de ais

En lugar de meterlos vía comando, se hace un join entre los insertados y
los recibidos
Adapta tests
parent f093ffa6
Loading
Loading
Loading
Loading
+10 −7
Original line number Diff line number Diff line
@@ -71,6 +71,9 @@ public class VesselCommandHandler extends CommandHandler {
	@Value("${broker.topic.vessel-type}")
	private String vesselTypeTopic;

	@Value("${broker.topic.realtime.vessels}")
	private String realtimeVesselsTopic;

	@Value("${stream.windows.time.ms}")
	private Long streamWindowsTime;

@@ -111,7 +114,7 @@ public class VesselCommandHandler extends CommandHandler {
					.serviceId(vesselsEventsStreamId)
					.windowsTime(streamWindowsTime)
					.build(), vesselTypeTopic, vesselsAggByVesselTypeTopic,
						vesselTypeUpdatedTopic, alertService);
						vesselTypeUpdatedTopic, realtimeVesselsTopic, alertService);
		// @formatter:on
	}

@@ -210,7 +213,7 @@ public class VesselCommandHandler extends CommandHandler {
	@KafkaHandler
	private void listen(VesselCreatedEvent event) {

		logger.info("Vessel creado " + event.getAggregateId());
		logger.debug("Vessel creado " + event.getAggregateId());

		// El evento Creado se envió desde el stream

@@ -226,7 +229,7 @@ public class VesselCommandHandler extends CommandHandler {
	@KafkaHandler
	private void listen(VesselUpdatedEvent event) {

		logger.info("Vessel modificado " + event.getAggregateId());
		logger.debug("Vessel modificado " + event.getAggregateId());

		// Envía los editados satisfactoriamente para tenerlos en cuenta en el
		// postupdate
@@ -252,7 +255,7 @@ public class VesselCommandHandler extends CommandHandler {
	@KafkaHandler
	private void listen(VesselDeletedEvent event) {

		logger.info("Vessel eliminado " + event.getAggregateId());
		logger.debug("Vessel eliminado " + event.getAggregateId());

		resolveCommand(event.getSessionId());
	}
@@ -267,7 +270,7 @@ public class VesselCommandHandler extends CommandHandler {
	@KafkaHandler
	private void listen(CreateVesselCancelledEvent event) {

		logger.info("Error creando Vessel " + event.getAggregateId());
		logger.debug("Error creando Vessel " + event.getAggregateId());

		resolveCommand(event.getSessionId(),
				ExceptionFactory.getException(event.getExceptionType(), event.getArguments()));
@@ -276,7 +279,7 @@ public class VesselCommandHandler extends CommandHandler {
	@KafkaHandler
	private void listen(UpdateVesselCancelledEvent event) {

		logger.info("Error modificando Vessel " + event.getAggregateId());
		logger.debug("Error modificando Vessel " + event.getAggregateId());

		// El evento Cancelled se envía desde el stream

@@ -294,7 +297,7 @@ public class VesselCommandHandler extends CommandHandler {
	@KafkaHandler
	private void listen(DeleteVesselCancelledEvent event) {

		logger.info("Error eliminando Vessel " + event.getAggregateId());
		logger.debug("Error eliminando Vessel " + event.getAggregateId());

		// El evento Cancelled se envía desde el stream

+61 −5
Original line number Diff line number Diff line
@@ -22,14 +22,18 @@ import es.redmic.brokerlib.avro.serde.hashmap.HashMapSerde;
import es.redmic.commandslib.exceptions.ExceptionType;
import es.redmic.commandslib.streaming.common.StreamConfig;
import es.redmic.commandslib.streaming.streams.EventSourcingStreams;
import es.redmic.vesselscommands.commands.vessel.CreateVesselCommand;
import es.redmic.vesselscommands.commands.vessel.UpdateVesselCommand;
import es.redmic.vesselslib.dto.vessel.VesselDTO;
import es.redmic.vesselslib.dto.vesseltype.VesselTypeDTO;
import es.redmic.vesselslib.events.vessel.VesselEventFactory;
import es.redmic.vesselslib.events.vessel.VesselEventTypes;
import es.redmic.vesselslib.events.vessel.common.VesselEvent;
import es.redmic.vesselslib.events.vessel.create.CreateVesselEnrichedEvent;
import es.redmic.vesselslib.events.vessel.create.EnrichCreateVesselEvent;
import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.AggregationVesselTypeInVesselPostUpdateEvent;
import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.UpdateVesselTypeInVesselEvent;
import es.redmic.vesselslib.events.vessel.update.EnrichUpdateVesselEvent;
import es.redmic.vesselslib.events.vessel.update.UpdateVesselEnrichedEvent;
import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes;
import es.redmic.vesselslib.events.vesseltype.common.VesselTypeEvent;
@@ -42,7 +46,7 @@ public class VesselEventStreams extends EventSourcingStreams {

	private String vesselTypeUpdatedTopic;

	// private String vesselTrackingAggByVesselTopic;
	private String realtimeVesselsTopic;

	private HashMapSerde<String, AggregationVesselTypeInVesselPostUpdateEvent> hashMapSerdeAggregationVesselTypeInVessel;

@@ -52,15 +56,19 @@ public class VesselEventStreams extends EventSourcingStreams {

	private KStream<String, Event> vesselTypeEvents;

	private KStream<String, VesselDTO> realtimeVessel;

	private final String REDMIC_PROCESS = "REDMIC_PROCESS";

	public VesselEventStreams(StreamConfig config, String vesselTypeTopic, String vesselsAggByVesselTypeTopic,
			String vesselTypeUpdatedTopic, AlertService alertService) {
			String vesselTypeUpdatedTopic, String realtimeVesselsTopic, AlertService alertService) {
		super(config, alertService);
		this.vesselTypeTopic = vesselTypeTopic + snapshotTopicSuffix;
		this.vesselsAggByVesselTypeTopic = vesselsAggByVesselTypeTopic;
		this.vesselTypeUpdatedTopic = vesselTypeUpdatedTopic;
		this.hashMapSerdeAggregationVesselTypeInVessel = new HashMapSerde<>(schemaRegistry);
		this.realtimeVesselsTopic = realtimeVesselsTopic;

		logger.info("Arrancado servicio de streaming para event sourcing de Vessel con Id: " + this.serviceId);
		init();
	}

@@ -81,6 +89,8 @@ public class VesselEventStreams extends EventSourcingStreams {
		vesselType = builder.globalTable(vesselTypeTopic);

		vesselTypeEvents = builder.stream(vesselTypeUpdatedTopic);

		realtimeVessel = builder.stream(realtimeVesselsTopic);
	}

	/**
@@ -412,7 +422,7 @@ public class VesselEventStreams extends EventSourcingStreams {
							+ " se encuentra en mitad de un ciclo de creación o edición, por lo que no se modificó la referencia "
							+ updateReferenceEvent.getAggregateId();

					logger.info(message);
					logger.error(message);
					alertService.errorAlert(aggregationEvent.getAggregateId(), message);
				}

@@ -429,6 +439,52 @@ public class VesselEventStreams extends EventSourcingStreams {
	}

	@Override
	protected void processExtraStreams(KStream<String, Event> events) {
	protected void processExtraStreams(KStream<String, Event> events, KStream<String, Event> snapshotEvents) {

		createVesselFromRealtimeVessel(realtimeVessel, snapshotEvents);
	}

	private void createVesselFromRealtimeVessel(KStream<String, VesselDTO> realtimeVessel,
			KStream<String, Event> events) {

		KTable<String, Event> table = events.groupByKey().reduce((aggValue, newValue) -> newValue);

		realtimeVessel
				.leftJoin(table, (vesselDTO, vesselEvent) -> getVesselEventFromRealtimeVessel(vesselDTO, vesselEvent))
				.filter((k, v) -> (v != null)).to(topic);
	}

	private Event getVesselEventFromRealtimeVessel(VesselDTO vesselDTO, Event vesselEvent) {

		if (vesselEvent == null) {
			return getEnrichCreateVesselEventFromRealtimeVessel(vesselDTO);
		}
		if (!((VesselEvent) vesselEvent).getVessel().equals(vesselDTO)) {
			return getEnrichUpdateVesselEventFromRealtimeVessel(vesselDTO, vesselEvent);
		}
		return null;
	}

	private Event getEnrichCreateVesselEventFromRealtimeVessel(VesselDTO vesselDTO) {

		vesselDTO = new CreateVesselCommand(vesselDTO).getVessel();

		EnrichCreateVesselEvent evt = new EnrichCreateVesselEvent(vesselDTO);
		evt.setAggregateId(vesselDTO.getId());
		evt.setVersion(1);
		evt.setUserId(REDMIC_PROCESS);
		return evt;
	}

	private Event getEnrichUpdateVesselEventFromRealtimeVessel(VesselDTO vesselDTO, Event vesselEvent) {

		VesselDTO vessel = new UpdateVesselCommand(((VesselEvent) vesselEvent).getVessel()).getVessel();
		vessel.copyFromAIS(vesselDTO);

		EnrichUpdateVesselEvent evt = new EnrichUpdateVesselEvent(vessel);
		evt.setAggregateId(vesselDTO.getId());
		evt.setVersion(vesselEvent.getVersion() + 1);
		evt.setUserId(REDMIC_PROCESS);
		return evt;
	}
}
+92 −30
Original line number Diff line number Diff line
package es.redmic.test.vesselscommands.integration.vessel;

/*-@RunWith(SpringJUnit4ClassRunner.class)
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;

import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import es.redmic.brokerlib.avro.common.Event;
import es.redmic.test.vesselscommands.integration.KafkaEmbeddedConfig;
import es.redmic.testutils.kafka.KafkaBaseIntegrationTest;
import es.redmic.vesselscommands.VesselsCommandsApplication;
import es.redmic.vesselslib.dto.vessel.VesselDTO;
import es.redmic.vesselslib.events.vessel.create.CreateVesselEnrichedEvent;
import es.redmic.vesselslib.events.vessel.update.EnrichUpdateVesselEvent;

@RunWith(SpringJUnit4ClassRunner.class)
@TestPropertySource(properties = { "spring.kafka.consumer.group-id=CreateVesselFromAIS", "schema.registry.port=18082" })
@SpringBootTest(classes = { VesselsCommandsApplication.class })
@ActiveProfiles("test")
@@ -8,15 +42,23 @@ package es.redmic.test.vesselscommands.integration.vessel;
@KafkaListener(topics = "${broker.topic.vessel}", groupId = "CreateVesselFromAISTest")
public class CreateVesselFromAISTest extends KafkaBaseIntegrationTest {

	@Value("${broker.topic.realtime.tracking.vessels}")
	String REALTIME_TRACKING_VESSELS_TOPIC;
	@Value("${broker.topic.realtime.vessels}")
	String REALTIME_VESSELS_TOPIC;

	@Value("${broker.topic.vessel}")
	String VESSEL_TOPIC;

	private Integer mmsi = 111;

	@ClassRule
	public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(KafkaEmbeddedConfig.NUM_BROKERS, true,
			KafkaEmbeddedConfig.PARTITIONS_PER_TOPIC, KafkaEmbeddedConfig.TOPICS_NAME);

	@Autowired
	private KafkaTemplate<String, AISTrackingDTO> kafkaTemplate;
	private KafkaTemplate<String, VesselDTO> kafkaTemplate;

	@Autowired
	private KafkaTemplate<String, Event> kafkaEventTemplate;

	protected static BlockingQueue<Object> blockingQueue;

@@ -35,33 +77,47 @@ public class CreateVesselFromAISTest extends KafkaBaseIntegrationTest {
	@Test
	public void createVessel_SendCreateVesselEvent_IfCommandWasSuccess() throws Exception {

		AISTrackingDTO dto = new AISTrackingDTO();
		dto.setMmsi(2);
		dto.setImo(1);
		dto.setName("Avatar");
		dto.setType(70);
		dto.setCallSign("1");
		dto.setTstamp(new DateTime());
		dto.setLatitude(2.1);
		dto.setLongitude(3.2);
		dto.setA(2.9);
		dto.setB(3.5);
		dto.setC(1.4);
		dto.setD(1.4);

		ListenableFuture<SendResult<String, AISTrackingDTO>> future = kafkaTemplate
				.send(REALTIME_TRACKING_VESSELS_TOPIC, "vessel-mmsi-" + dto.getMmsi(), dto);
		future.addCallback(new SendListener());

		VesselDTO vessel = (VesselDTO) blockingQueue.poll(120, TimeUnit.SECONDS);
		VesselDTO source = VesselDataUtil.getVessel(mmsi);

		// Envía un vessel simulando que llega desde ais. Al no haber llegado antes debe
		// generar un evento
		kafkaTemplate.send(REALTIME_VESSELS_TOPIC, source.getId(), source);

		VesselDTO vessel = (VesselDTO) blockingQueue.poll(30, TimeUnit.SECONDS);

		assertNotNull(vessel);
		assertEquals(vessel.getMmsi(), dto.getMmsi());
		assertEquals(vessel.getName(), dto.getName());
		Double length = dto.getA() + dto.getB();
		assertEquals(vessel.getLength(), length);
		assertEquals(vessel.getMmsi(), source.getMmsi());
		assertEquals(vessel.getName(), source.getName());
		assertEquals(vessel.getLength(), source.getLength());
		assertEquals(vessel.getBeam(), source.getBeam());

		Thread.sleep(3000);

		// Envía un evento de vessel creado para simularlo. Al ya estar creado, solo
		// genera evento si ha cambiado
		kafkaEventTemplate.send(VESSEL_TOPIC, source.getId(), VesselDataUtil.getVesselCreatedEvent(mmsi));

		Thread.sleep(3000);

		// Al estar ya añadido, si
		// llega de nuevo el mismo barco, lo descartará y no llegará nada

		Double beam = dto.getC() + dto.getD();
		assertEquals(vessel.getBeam(), beam);
		kafkaTemplate.send(REALTIME_VESSELS_TOPIC, source.getId(), source);

		vessel = (VesselDTO) blockingQueue.poll(30, TimeUnit.SECONDS);
		assertNull(vessel);

		// En este caso se cambia el nombre y se comprueba que genera un
		// evento para modificarlo
		source.setName("Otro");

		kafkaTemplate.send(REALTIME_VESSELS_TOPIC, source.getId(), source);

		Thread.sleep(3000);

		vessel = (VesselDTO) blockingQueue.poll(30, TimeUnit.SECONDS);
		assertNotNull(vessel);
		assertEquals("Otro", vessel.getName());
	}

	@KafkaHandler
@@ -70,8 +126,14 @@ public class CreateVesselFromAISTest extends KafkaBaseIntegrationTest {
		blockingQueue.offer(createVesselEnrichedEvent.getVessel());
	}

	@KafkaHandler
	public void listen(EnrichUpdateVesselEvent enrichUpdateVesselEvent) {

		blockingQueue.offer(enrichUpdateVesselEvent.getVessel());
	}

	@KafkaHandler(isDefault = true)
	public void defaultEvent(Object def) {

	}
}-*/
 No newline at end of file
}
 No newline at end of file