Loading vessels-commands/src/test/java/es/redmic/test/vesselscommands/integration/vesseltracking/CreateVesselTrackingFromAISTest.java +4 −17 Original line number Diff line number Diff line Loading @@ -21,18 +21,14 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.annotation.KafkaHandler; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.rule.EmbeddedKafkaRule; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.util.concurrent.ListenableFuture; import es.redmic.brokerlib.avro.common.Event; import es.redmic.brokerlib.avro.geodata.tracking.vessels.AISTrackingDTO; import es.redmic.brokerlib.listener.SendListener; import es.redmic.test.vesselscommands.integration.KafkaEmbeddedConfig; import es.redmic.test.vesselscommands.integration.vessel.VesselDataUtil; import es.redmic.testutils.kafka.KafkaBaseIntegrationTest; Loading @@ -42,7 +38,7 @@ import es.redmic.vesselscommands.service.VesselTrackingCommandService; import es.redmic.vesselslib.dto.tracking.VesselTrackingDTO; import es.redmic.vesselslib.dto.vessel.VesselDTO; import es.redmic.vesselslib.events.vessel.create.VesselCreatedEvent; import es.redmic.vesselslib.events.vesseltracking.create.CreateVesselTrackingEnrichedEvent; import es.redmic.vesselslib.events.vesseltracking.create.CreateVesselTrackingEvent; @RunWith(SpringJUnit4ClassRunner.class) @TestPropertySource(properties = { "spring.kafka.consumer.group-id=CreateVesselTrackingFromAIS", Loading Loading @@ -73,9 +69,6 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest { @Autowired VesselTrackingCommandHandler vesselTrackingCommandHandler; @Autowired private KafkaTemplate<String, Event> kafkaTemplateEvent; protected static BlockingQueue<Object> blockingQueue; @PostConstruct Loading Loading @@ -117,11 +110,6 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest { dto.setEta("00:00 00:00"); dto.setDest("Santa Cruz de Tenerife"); // Envía vessel para almacenarlo en kafka ListenableFuture<SendResult<String, Event>> futureCreatedEvent = kafkaTemplateEvent.send(VESSEL_TOPIC, vesselCreatedEvent.getAggregateId(), vesselCreatedEvent); futureCreatedEvent.addCallback(new SendListener()); // LLama directamente al servicio para evitar pasar por vessel service.create(dto); Loading @@ -148,13 +136,12 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest { } @KafkaHandler public void listen(CreateVesselTrackingEnrichedEvent createVesselTrackingEnrichedEvent) throws Exception { public void listen(CreateVesselTrackingEvent createVesselTrackingEvent) throws Exception { // Resuelve la espera para que siga la ejecución Whitebox.invokeMethod(vesselTrackingCommandHandler, "resolveCommand", createVesselTrackingEnrichedEvent.getSessionId()); Whitebox.invokeMethod(vesselTrackingCommandHandler, "resolveCommand", createVesselTrackingEvent.getSessionId()); blockingQueue.offer(createVesselTrackingEnrichedEvent.getVesselTracking()); blockingQueue.offer(createVesselTrackingEvent.getVesselTracking()); } @KafkaHandler(isDefault = true) Loading Loading
vessels-commands/src/test/java/es/redmic/test/vesselscommands/integration/vesseltracking/CreateVesselTrackingFromAISTest.java +4 −17 Original line number Diff line number Diff line Loading @@ -21,18 +21,14 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.annotation.KafkaHandler; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.rule.EmbeddedKafkaRule; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.util.concurrent.ListenableFuture; import es.redmic.brokerlib.avro.common.Event; import es.redmic.brokerlib.avro.geodata.tracking.vessels.AISTrackingDTO; import es.redmic.brokerlib.listener.SendListener; import es.redmic.test.vesselscommands.integration.KafkaEmbeddedConfig; import es.redmic.test.vesselscommands.integration.vessel.VesselDataUtil; import es.redmic.testutils.kafka.KafkaBaseIntegrationTest; Loading @@ -42,7 +38,7 @@ import es.redmic.vesselscommands.service.VesselTrackingCommandService; import es.redmic.vesselslib.dto.tracking.VesselTrackingDTO; import es.redmic.vesselslib.dto.vessel.VesselDTO; import es.redmic.vesselslib.events.vessel.create.VesselCreatedEvent; import es.redmic.vesselslib.events.vesseltracking.create.CreateVesselTrackingEnrichedEvent; import es.redmic.vesselslib.events.vesseltracking.create.CreateVesselTrackingEvent; @RunWith(SpringJUnit4ClassRunner.class) @TestPropertySource(properties = { "spring.kafka.consumer.group-id=CreateVesselTrackingFromAIS", Loading Loading @@ -73,9 +69,6 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest { @Autowired VesselTrackingCommandHandler vesselTrackingCommandHandler; @Autowired private KafkaTemplate<String, Event> kafkaTemplateEvent; protected static BlockingQueue<Object> blockingQueue; @PostConstruct Loading Loading @@ -117,11 +110,6 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest { dto.setEta("00:00 00:00"); dto.setDest("Santa Cruz de Tenerife"); // Envía vessel para almacenarlo en kafka ListenableFuture<SendResult<String, Event>> futureCreatedEvent = kafkaTemplateEvent.send(VESSEL_TOPIC, vesselCreatedEvent.getAggregateId(), vesselCreatedEvent); futureCreatedEvent.addCallback(new SendListener()); // LLama directamente al servicio para evitar pasar por vessel service.create(dto); Loading @@ -148,13 +136,12 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest { } @KafkaHandler public void listen(CreateVesselTrackingEnrichedEvent createVesselTrackingEnrichedEvent) throws Exception { public void listen(CreateVesselTrackingEvent createVesselTrackingEvent) throws Exception { // Resuelve la espera para que siga la ejecución Whitebox.invokeMethod(vesselTrackingCommandHandler, "resolveCommand", createVesselTrackingEnrichedEvent.getSessionId()); Whitebox.invokeMethod(vesselTrackingCommandHandler, "resolveCommand", createVesselTrackingEvent.getSessionId()); blockingQueue.offer(createVesselTrackingEnrichedEvent.getVesselTracking()); blockingQueue.offer(createVesselTrackingEvent.getVesselTracking()); } @KafkaHandler(isDefault = true) Loading