Loading vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselEventStreams.java +2 −2 Original line number Diff line number Diff line Loading @@ -224,9 +224,9 @@ public class VesselEventStreams extends EventStreams { }, Materialized.with(Serdes.String(), new ArrayListSerde<>(schemaRegistry))); // Vesseltypes modificados KStream<String, VesselTypeEvent> vesselTypeEvents = builder.stream(vesselTypeTopic); KStream<String, Event> vesselTypeEvents = builder.stream(vesselTypeTopic); KStream<String, VesselTypeEvent> updateReferenceEvents = vesselTypeEvents KStream<String, Event> updateReferenceEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventType.VESSELTYPE_UPDATED.toString().equals(event.getType()))); KStream<String, ArrayList<VesselEvent>> join = updateReferenceEvents.join(vesselEventsTable, Loading Loading
vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselEventStreams.java +2 −2 Original line number Diff line number Diff line Loading @@ -224,9 +224,9 @@ public class VesselEventStreams extends EventStreams { }, Materialized.with(Serdes.String(), new ArrayListSerde<>(schemaRegistry))); // Vesseltypes modificados KStream<String, VesselTypeEvent> vesselTypeEvents = builder.stream(vesselTypeTopic); KStream<String, Event> vesselTypeEvents = builder.stream(vesselTypeTopic); KStream<String, VesselTypeEvent> updateReferenceEvents = vesselTypeEvents KStream<String, Event> updateReferenceEvents = vesselTypeEvents .filter((id, event) -> (VesselTypeEventType.VESSELTYPE_UPDATED.toString().equals(event.getType()))); KStream<String, ArrayList<VesselEvent>> join = updateReferenceEvents.join(vesselEventsTable, Loading