Loading vessels-commands/src/test/java/es/redmic/test/vesselscommands/integration/vesseltracking/CreateVesselTrackingFromAISTest.java +32 −38 Original line number Diff line number Diff line Loading @@ -15,6 +15,7 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.reflect.Whitebox; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; Loading @@ -36,11 +37,11 @@ import es.redmic.test.vesselscommands.integration.KafkaEmbeddedConfig; import es.redmic.test.vesselscommands.integration.vessel.VesselDataUtil; import es.redmic.testutils.kafka.KafkaBaseIntegrationTest; import es.redmic.vesselscommands.VesselsCommandsApplication; import es.redmic.vesselscommands.handler.VesselTrackingCommandHandler; import es.redmic.vesselscommands.service.VesselTrackingCommandService; import es.redmic.vesselslib.dto.tracking.VesselTrackingDTO; import es.redmic.vesselslib.dto.vessel.VesselDTO; import es.redmic.vesselslib.events.vessel.VesselEventTypes; import es.redmic.vesselslib.events.vessel.create.VesselCreatedEvent; import es.redmic.vesselslib.events.vesseltracking.create.CreateVesselTrackingEvent; import es.redmic.vesselslib.events.vesseltracking.create.EnrichCreateVesselTrackingEvent; @RunWith(SpringJUnit4ClassRunner.class) Loading @@ -60,14 +61,17 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest { private Integer mmsi = 1; private VesselCreatedEvent vesselCreatedEvent = VesselDataUtil.getVesselCreatedEvent(mmsi); VesselDTO vessel; @ClassRule public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(KafkaEmbeddedConfig.NUM_BROKERS, true, KafkaEmbeddedConfig.PARTITIONS_PER_TOPIC, KafkaEmbeddedConfig.TOPICS_NAME); @Autowired private KafkaTemplate<String, AISTrackingDTO> kafkaTemplate; VesselTrackingCommandService service; @Autowired VesselTrackingCommandHandler vesselTrackingCommandHandler; @Autowired private KafkaTemplate<String, Event> kafkaTemplateEvent; Loading @@ -88,21 +92,23 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest { @Test public void createVesselTracking_SendCreateVesselTrackingEvent_IfCommandWasSuccess() throws Exception { VesselDTO vesselSource = vesselCreatedEvent.getVessel(); VesselCreatedEvent vesselCreatedEvent = VesselDataUtil.getVesselCreatedEvent(mmsi); vessel = vesselCreatedEvent.getVessel(); AISTrackingDTO dto = new AISTrackingDTO(); dto.setMmsi(vesselSource.getMmsi()); dto.setImo(vesselSource.getImo()); dto.setName(vesselSource.getName()); dto.setType(Integer.parseInt(vesselSource.getType().getCode())); dto.setCallSign(vesselSource.getCallSign()); dto.setMmsi(vessel.getMmsi()); dto.setImo(vessel.getImo()); dto.setName(vessel.getName()); dto.setType(Integer.parseInt(vessel.getType().getCode())); dto.setCallSign(vessel.getCallSign()); dto.setTstamp(new DateTime()); dto.setLatitude(2.1); dto.setLongitude(3.2); dto.setA(vesselSource.getLength() / 2); dto.setB(vesselSource.getLength() / 2); dto.setC(vesselSource.getBeam() / 2); dto.setD(vesselSource.getBeam() / 2); dto.setA(vessel.getLength() / 2); dto.setB(vessel.getLength() / 2); dto.setC(vessel.getBeam() / 2); dto.setD(vessel.getBeam() / 2); dto.setCog(2.3); dto.setSog(3.4); dto.setHeading(221); Loading @@ -110,11 +116,15 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest { dto.setEta("00:00 00:00"); dto.setDest("Santa Cruz de Tenerife"); ListenableFuture<SendResult<String, AISTrackingDTO>> future = kafkaTemplate .send(REALTIME_TRACKING_VESSELS_TOPIC, dto.getMmsi().toString(), dto); future.addCallback(new SendListener()); // Envía vessel para almacenarlo en kafka ListenableFuture<SendResult<String, Event>> futureCreatedEvent = kafkaTemplateEvent.send(VESSEL_TOPIC, vesselCreatedEvent.getAggregateId(), vesselCreatedEvent); futureCreatedEvent.addCallback(new SendListener()); VesselTrackingDTO vesselTracking = (VesselTrackingDTO) blockingQueue.poll(240, TimeUnit.SECONDS); // LLama directamente al servicio para evitar pasar por vessel service.create(dto); VesselTrackingDTO vesselTracking = (VesselTrackingDTO) blockingQueue.poll(4, TimeUnit.MINUTES); assertNotNull(vesselTracking); assertTrue(vesselTracking.getProperties().getDate().isEqual(dto.getTstamp())); Loading @@ -136,32 +146,16 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest { } @KafkaListener(topics = "${broker.topic.vessel}", groupId = "CreateVesselTrackingFromAISTest") public void listen(Event event) { vesselCreatedEvent.setSessionId(event.getSessionId()); if (event.getType().equals(VesselEventTypes.ENRICH_CREATE)) { ListenableFuture<SendResult<String, Event>> futureCreatedEvent = kafkaTemplateEvent.send(VESSEL_TOPIC, "vessel-mmsi-" + mmsi, vesselCreatedEvent); futureCreatedEvent.addCallback(new SendListener()); } } @KafkaHandler public void listen(CreateVesselTrackingEvent createEvent) { public void listen(EnrichCreateVesselTrackingEvent createEvent) throws Exception { blockingQueue.offer(createEvent.getVesselTracking()); } @KafkaHandler public void listen(EnrichCreateVesselTrackingEvent createEvent) { // Resuelve la espera para que siga la ejecución Whitebox.invokeMethod(vesselTrackingCommandHandler, "resolveCommand", createEvent.getSessionId()); blockingQueue.offer(createEvent.getVesselTracking()); } @KafkaHandler(isDefault = true) public void defaultEvent(Object def) { public void defaultEvent(Event def) { } } Loading
vessels-commands/src/test/java/es/redmic/test/vesselscommands/integration/vesseltracking/CreateVesselTrackingFromAISTest.java +32 −38 Original line number Diff line number Diff line Loading @@ -15,6 +15,7 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.reflect.Whitebox; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; Loading @@ -36,11 +37,11 @@ import es.redmic.test.vesselscommands.integration.KafkaEmbeddedConfig; import es.redmic.test.vesselscommands.integration.vessel.VesselDataUtil; import es.redmic.testutils.kafka.KafkaBaseIntegrationTest; import es.redmic.vesselscommands.VesselsCommandsApplication; import es.redmic.vesselscommands.handler.VesselTrackingCommandHandler; import es.redmic.vesselscommands.service.VesselTrackingCommandService; import es.redmic.vesselslib.dto.tracking.VesselTrackingDTO; import es.redmic.vesselslib.dto.vessel.VesselDTO; import es.redmic.vesselslib.events.vessel.VesselEventTypes; import es.redmic.vesselslib.events.vessel.create.VesselCreatedEvent; import es.redmic.vesselslib.events.vesseltracking.create.CreateVesselTrackingEvent; import es.redmic.vesselslib.events.vesseltracking.create.EnrichCreateVesselTrackingEvent; @RunWith(SpringJUnit4ClassRunner.class) Loading @@ -60,14 +61,17 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest { private Integer mmsi = 1; private VesselCreatedEvent vesselCreatedEvent = VesselDataUtil.getVesselCreatedEvent(mmsi); VesselDTO vessel; @ClassRule public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(KafkaEmbeddedConfig.NUM_BROKERS, true, KafkaEmbeddedConfig.PARTITIONS_PER_TOPIC, KafkaEmbeddedConfig.TOPICS_NAME); @Autowired private KafkaTemplate<String, AISTrackingDTO> kafkaTemplate; VesselTrackingCommandService service; @Autowired VesselTrackingCommandHandler vesselTrackingCommandHandler; @Autowired private KafkaTemplate<String, Event> kafkaTemplateEvent; Loading @@ -88,21 +92,23 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest { @Test public void createVesselTracking_SendCreateVesselTrackingEvent_IfCommandWasSuccess() throws Exception { VesselDTO vesselSource = vesselCreatedEvent.getVessel(); VesselCreatedEvent vesselCreatedEvent = VesselDataUtil.getVesselCreatedEvent(mmsi); vessel = vesselCreatedEvent.getVessel(); AISTrackingDTO dto = new AISTrackingDTO(); dto.setMmsi(vesselSource.getMmsi()); dto.setImo(vesselSource.getImo()); dto.setName(vesselSource.getName()); dto.setType(Integer.parseInt(vesselSource.getType().getCode())); dto.setCallSign(vesselSource.getCallSign()); dto.setMmsi(vessel.getMmsi()); dto.setImo(vessel.getImo()); dto.setName(vessel.getName()); dto.setType(Integer.parseInt(vessel.getType().getCode())); dto.setCallSign(vessel.getCallSign()); dto.setTstamp(new DateTime()); dto.setLatitude(2.1); dto.setLongitude(3.2); dto.setA(vesselSource.getLength() / 2); dto.setB(vesselSource.getLength() / 2); dto.setC(vesselSource.getBeam() / 2); dto.setD(vesselSource.getBeam() / 2); dto.setA(vessel.getLength() / 2); dto.setB(vessel.getLength() / 2); dto.setC(vessel.getBeam() / 2); dto.setD(vessel.getBeam() / 2); dto.setCog(2.3); dto.setSog(3.4); dto.setHeading(221); Loading @@ -110,11 +116,15 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest { dto.setEta("00:00 00:00"); dto.setDest("Santa Cruz de Tenerife"); ListenableFuture<SendResult<String, AISTrackingDTO>> future = kafkaTemplate .send(REALTIME_TRACKING_VESSELS_TOPIC, dto.getMmsi().toString(), dto); future.addCallback(new SendListener()); // Envía vessel para almacenarlo en kafka ListenableFuture<SendResult<String, Event>> futureCreatedEvent = kafkaTemplateEvent.send(VESSEL_TOPIC, vesselCreatedEvent.getAggregateId(), vesselCreatedEvent); futureCreatedEvent.addCallback(new SendListener()); VesselTrackingDTO vesselTracking = (VesselTrackingDTO) blockingQueue.poll(240, TimeUnit.SECONDS); // LLama directamente al servicio para evitar pasar por vessel service.create(dto); VesselTrackingDTO vesselTracking = (VesselTrackingDTO) blockingQueue.poll(4, TimeUnit.MINUTES); assertNotNull(vesselTracking); assertTrue(vesselTracking.getProperties().getDate().isEqual(dto.getTstamp())); Loading @@ -136,32 +146,16 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest { } @KafkaListener(topics = "${broker.topic.vessel}", groupId = "CreateVesselTrackingFromAISTest") public void listen(Event event) { vesselCreatedEvent.setSessionId(event.getSessionId()); if (event.getType().equals(VesselEventTypes.ENRICH_CREATE)) { ListenableFuture<SendResult<String, Event>> futureCreatedEvent = kafkaTemplateEvent.send(VESSEL_TOPIC, "vessel-mmsi-" + mmsi, vesselCreatedEvent); futureCreatedEvent.addCallback(new SendListener()); } } @KafkaHandler public void listen(CreateVesselTrackingEvent createEvent) { public void listen(EnrichCreateVesselTrackingEvent createEvent) throws Exception { blockingQueue.offer(createEvent.getVesselTracking()); } @KafkaHandler public void listen(EnrichCreateVesselTrackingEvent createEvent) { // Resuelve la espera para que siga la ejecución Whitebox.invokeMethod(vesselTrackingCommandHandler, "resolveCommand", createEvent.getSessionId()); blockingQueue.offer(createEvent.getVesselTracking()); } @KafkaHandler(isDefault = true) public void defaultEvent(Object def) { public void defaultEvent(Event def) { } }