Loading vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselTrackingEventStreams.java +6 −6 Original line number Diff line number Diff line package es.redmic.vesselscommands.streams; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import es.redmic.brokerlib.alert.AlertService; import es.redmic.brokerlib.avro.common.Event; Loading Loading @@ -262,11 +262,11 @@ public class VesselTrackingEventStreams extends EventSourcingStreams { private void createTrackingFromRealtimeTrackingVessel(KStream<String, VesselTrackingDTO> realTimeTracking, KStream<String, Event> events) { realTimeTracking .leftJoin(events, KTable<String, Event> table = events.groupByKey().reduce((aggValue, newValue) -> newValue); realTimeTracking.leftJoin(table, (vesselTrackingDTO, vesselTrackingEvent) -> getCreateTrackingFromRealtimeTrackingVessel( vesselTrackingDTO, vesselTrackingEvent), JoinWindows.of(windowsTime)) vesselTrackingDTO, vesselTrackingEvent)) .filter((k, v) -> (v != null)).to(topic); } Loading Loading
vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselTrackingEventStreams.java +6 −6 Original line number Diff line number Diff line package es.redmic.vesselscommands.streams; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import es.redmic.brokerlib.alert.AlertService; import es.redmic.brokerlib.avro.common.Event; Loading Loading @@ -262,11 +262,11 @@ public class VesselTrackingEventStreams extends EventSourcingStreams { private void createTrackingFromRealtimeTrackingVessel(KStream<String, VesselTrackingDTO> realTimeTracking, KStream<String, Event> events) { realTimeTracking .leftJoin(events, KTable<String, Event> table = events.groupByKey().reduce((aggValue, newValue) -> newValue); realTimeTracking.leftJoin(table, (vesselTrackingDTO, vesselTrackingEvent) -> getCreateTrackingFromRealtimeTrackingVessel( vesselTrackingDTO, vesselTrackingEvent), JoinWindows.of(windowsTime)) vesselTrackingDTO, vesselTrackingEvent)) .filter((k, v) -> (v != null)).to(topic); } Loading