Loading vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselTrackingEventStreams.java +1 −2 Original line number Diff line number Diff line Loading @@ -256,7 +256,7 @@ public class VesselTrackingEventStreams extends EventSourcingStreams { @Override protected void processExtraStreams(KStream<String, Event> events, KStream<String, Event> snapshotEvents) { createTrackingFromRealtimeTrackingVessel(realtimeTracking, snapshotEvents); createTrackingFromRealtimeTrackingVessel(realtimeTracking, events); } private void createTrackingFromRealtimeTrackingVessel(KStream<String, VesselTrackingDTO> realTimeTracking, Loading @@ -281,7 +281,6 @@ public class VesselTrackingEventStreams extends EventSourcingStreams { evt.setAggregateId(vesselTrackingDTO.getId()); evt.setVersion(1); evt.setUserId(REDMIC_PROCESS); return evt; } return null; Loading vessels-commands/src/test/java/es/redmic/test/vesselscommands/integration/vesseltracking/CreateVesselTrackingFromAISTest.java +6 −2 Original line number Diff line number Diff line Loading @@ -2,6 +2,7 @@ package es.redmic.test.vesselscommands.integration.vesseltracking; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.util.concurrent.BlockingQueue; Loading Loading @@ -81,11 +82,11 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest { kafkaTemplate.send(REALTIME_TRACKING_VESSELS_TOPIC, source.getId(), source); Thread.sleep(1000); Thread.sleep(3000); kafkaTemplate.send(REALTIME_TRACKING_VESSELS_TOPIC, source.getId(), source); VesselTrackingDTO vesselTracking = (VesselTrackingDTO) blockingQueue.poll(4, TimeUnit.MINUTES); VesselTrackingDTO vesselTracking = (VesselTrackingDTO) blockingQueue.poll(20, TimeUnit.SECONDS); assertNotNull(vesselTracking); assertTrue(vesselTracking.getProperties().getDate().isEqual(source.getProperties().getDate())); Loading @@ -102,6 +103,9 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest { assertEquals(vessel.getName(), source.getProperties().getVessel().getName()); assertEquals(vessel.getLength(), source.getProperties().getVessel().getLength()); assertEquals(vessel.getBeam(), source.getProperties().getVessel().getBeam()); vesselTracking = (VesselTrackingDTO) blockingQueue.poll(20, TimeUnit.SECONDS); assertNull(vesselTracking); } @KafkaHandler Loading Loading
vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselTrackingEventStreams.java +1 −2 Original line number Diff line number Diff line Loading @@ -256,7 +256,7 @@ public class VesselTrackingEventStreams extends EventSourcingStreams { @Override protected void processExtraStreams(KStream<String, Event> events, KStream<String, Event> snapshotEvents) { createTrackingFromRealtimeTrackingVessel(realtimeTracking, snapshotEvents); createTrackingFromRealtimeTrackingVessel(realtimeTracking, events); } private void createTrackingFromRealtimeTrackingVessel(KStream<String, VesselTrackingDTO> realTimeTracking, Loading @@ -281,7 +281,6 @@ public class VesselTrackingEventStreams extends EventSourcingStreams { evt.setAggregateId(vesselTrackingDTO.getId()); evt.setVersion(1); evt.setUserId(REDMIC_PROCESS); return evt; } return null; Loading
vessels-commands/src/test/java/es/redmic/test/vesselscommands/integration/vesseltracking/CreateVesselTrackingFromAISTest.java +6 −2 Original line number Diff line number Diff line Loading @@ -2,6 +2,7 @@ package es.redmic.test.vesselscommands.integration.vesseltracking; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.util.concurrent.BlockingQueue; Loading Loading @@ -81,11 +82,11 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest { kafkaTemplate.send(REALTIME_TRACKING_VESSELS_TOPIC, source.getId(), source); Thread.sleep(1000); Thread.sleep(3000); kafkaTemplate.send(REALTIME_TRACKING_VESSELS_TOPIC, source.getId(), source); VesselTrackingDTO vesselTracking = (VesselTrackingDTO) blockingQueue.poll(4, TimeUnit.MINUTES); VesselTrackingDTO vesselTracking = (VesselTrackingDTO) blockingQueue.poll(20, TimeUnit.SECONDS); assertNotNull(vesselTracking); assertTrue(vesselTracking.getProperties().getDate().isEqual(source.getProperties().getDate())); Loading @@ -102,6 +103,9 @@ public class CreateVesselTrackingFromAISTest extends KafkaBaseIntegrationTest { assertEquals(vessel.getName(), source.getProperties().getVessel().getName()); assertEquals(vessel.getLength(), source.getProperties().getVessel().getLength()); assertEquals(vessel.getBeam(), source.getProperties().getVessel().getBeam()); vesselTracking = (VesselTrackingDTO) blockingQueue.poll(20, TimeUnit.SECONDS); assertNull(vesselTracking); } @KafkaHandler Loading