Loading vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselTrackingEventStreams.java +2 −2 Original line number Diff line number Diff line Loading @@ -51,7 +51,7 @@ public class VesselTrackingEventStreams extends EventSourcingStreams { public VesselTrackingEventStreams(StreamConfig config, String vesselTopic, String vesselTrackingAggByVesselTopic, String vesselUpdatedTopic, AlertService alertService) { super(config, alertService); this.vesselTopic = vesselTopic; this.vesselTopic = vesselTopic + snapshotTopicSuffix; this.vesselTrackingAggByVesselTopic = vesselTrackingAggByVesselTopic; this.vesselUpdatedTopic = vesselUpdatedTopic; this.hashMapSerde = new HashMapSerde<String, AggregationVesselInVesselTrackingPostUpdateEvent>(schemaRegistry); Loading Loading @@ -101,7 +101,7 @@ public class VesselTrackingEventStreams extends EventSourcingStreams { .selectKey((k, v) -> getVesselIdFromVesselTracking(v)); enrichCreateEvents .join(vessel, (k, v) -> k, .leftJoin(vessel, (k, v) -> k, (enrichCreateEvent, vesselEvent) -> getEnrichCreateResultEvent(enrichCreateEvent, vesselEvent)) .selectKey((k, v) -> v.getAggregateId()).to(topic); } Loading Loading
vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselTrackingEventStreams.java +2 −2 Original line number Diff line number Diff line Loading @@ -51,7 +51,7 @@ public class VesselTrackingEventStreams extends EventSourcingStreams { public VesselTrackingEventStreams(StreamConfig config, String vesselTopic, String vesselTrackingAggByVesselTopic, String vesselUpdatedTopic, AlertService alertService) { super(config, alertService); this.vesselTopic = vesselTopic; this.vesselTopic = vesselTopic + snapshotTopicSuffix; this.vesselTrackingAggByVesselTopic = vesselTrackingAggByVesselTopic; this.vesselUpdatedTopic = vesselUpdatedTopic; this.hashMapSerde = new HashMapSerde<String, AggregationVesselInVesselTrackingPostUpdateEvent>(schemaRegistry); Loading Loading @@ -101,7 +101,7 @@ public class VesselTrackingEventStreams extends EventSourcingStreams { .selectKey((k, v) -> getVesselIdFromVesselTracking(v)); enrichCreateEvents .join(vessel, (k, v) -> k, .leftJoin(vessel, (k, v) -> k, (enrichCreateEvent, vesselEvent) -> getEnrichCreateResultEvent(enrichCreateEvent, vesselEvent)) .selectKey((k, v) -> v.getAggregateId()).to(topic); } Loading