Loading vessels-commands/src/main/java/es/redmic/vesselscommands/handler/VesselCommandHandler.java +2 −8 Original line number Diff line number Diff line Loading @@ -53,12 +53,6 @@ public class VesselCommandHandler extends CommandHandler { @Value("${broker.topic.vessel}") private String vesselTopic; @Value("${broker.topic.vessel.updated}") private String vesselUpdatedTopic; @Value("${broker.topic.tracking.agg.by.vessel}") private String vesselTrackingAggByVesselTopic; @Value("${broker.topic.vessel.type.updated}") private String vesselTypeUpdatedTopic; Loading Loading @@ -117,7 +111,7 @@ public class VesselCommandHandler extends CommandHandler { .serviceId(vesselsEventsStreamId) .windowsTime(streamWindowsTime) .build(), vesselTypeTopic, vesselsAggByVesselTypeTopic, vesselTypeUpdatedTopic, vesselTrackingAggByVesselTopic, alertService); vesselTypeUpdatedTopic, alertService); // @formatter:on } Loading Loading @@ -236,7 +230,7 @@ public class VesselCommandHandler extends CommandHandler { // Envía los editados satisfactoriamente para tenerlos en cuenta en el // postupdate publishToKafka(event, vesselUpdatedTopic); // publishToKafka(event, vesselUpdatedTopic); // El evento Modificado se envió desde el stream Loading vessels-commands/src/main/java/es/redmic/vesselscommands/handler/VesselTrackingCommandHandler.java +1 −7 Original line number Diff line number Diff line Loading @@ -51,12 +51,6 @@ public class VesselTrackingCommandHandler extends CommandHandler { @Value("${broker.topic.vessel-tracking}") private String vesselTrackingTopic; @Value("${broker.topic.vessel.updated}") private String vesselUpdatedTopic; @Value("${broker.topic.tracking.agg.by.vessel}") private String vesselTrackingAggByVesselTopic; @Value("${broker.state.store.vesseltracking.dir}") private String stateStoreVesselTrackingDir; Loading Loading @@ -108,7 +102,7 @@ public class VesselTrackingCommandHandler extends CommandHandler { config .serviceId(vesselTrackingEventsStreamId) .windowsTime(streamWindowsTime) .build(), vesselTopic, vesselTrackingAggByVesselTopic, vesselUpdatedTopic, alertService); .build(), vesselTopic, alertService); // @formatter:on } Loading vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselEventStreams.java +16 −20 Original line number Diff line number Diff line Loading @@ -5,6 +5,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.JoinWindows; Loading @@ -18,7 +19,6 @@ import es.redmic.brokerlib.avro.common.Event; import es.redmic.brokerlib.avro.common.EventError; import es.redmic.brokerlib.avro.common.EventTypes; import es.redmic.brokerlib.avro.serde.hashmap.HashMapSerde; import es.redmic.commandslib.exceptions.ExceptionType; import es.redmic.commandslib.streaming.common.StreamConfig; import es.redmic.commandslib.streaming.streams.EventSourcingStreams; import es.redmic.vesselslib.dto.vessel.VesselDTO; Loading @@ -30,7 +30,6 @@ import es.redmic.vesselslib.events.vessel.create.CreateVesselEnrichedEvent; import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.AggregationVesselTypeInVesselPostUpdateEvent; import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.UpdateVesselTypeInVesselEvent; import es.redmic.vesselslib.events.vessel.update.UpdateVesselEnrichedEvent; import es.redmic.vesselslib.events.vesseltracking.partialupdate.vessel.AggregationVesselInVesselTrackingPostUpdateEvent; import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes; import es.redmic.vesselslib.events.vesseltype.common.VesselTypeEvent; Loading @@ -42,7 +41,7 @@ public class VesselEventStreams extends EventSourcingStreams { private String vesselTypeUpdatedTopic; private String vesselTrackingAggByVesselTopic; // private String vesselTrackingAggByVesselTopic; private HashMapSerde<String, AggregationVesselTypeInVesselPostUpdateEvent> hashMapSerdeAggregationVesselTypeInVessel; Loading @@ -52,19 +51,13 @@ public class VesselEventStreams extends EventSourcingStreams { private KStream<String, Event> vesselTypeEvents; private HashMapSerde<String, AggregationVesselInVesselTrackingPostUpdateEvent> hashMapSerdeAggregationVesselInVesselTracking; private KTable<String, HashMap<String, AggregationVesselInVesselTrackingPostUpdateEvent>> aggByVessel; public VesselEventStreams(StreamConfig config, String vesselTypeTopic, String vesselsAggByVesselTypeTopic, String vesselTypeUpdatedTopic, String vesselTrackingAggByVesselTopic, AlertService alertService) { String vesselTypeUpdatedTopic, AlertService alertService) { super(config, alertService); this.vesselTypeTopic = vesselTypeTopic + snapshotTopicSuffix; this.vesselsAggByVesselTypeTopic = vesselsAggByVesselTypeTopic; this.vesselTypeUpdatedTopic = vesselTypeUpdatedTopic; this.vesselTrackingAggByVesselTopic = vesselTrackingAggByVesselTopic; this.hashMapSerdeAggregationVesselTypeInVessel = new HashMapSerde<>(schemaRegistry); this.hashMapSerdeAggregationVesselInVesselTracking = new HashMapSerde<>(schemaRegistry); logger.info("Arrancado servicio de streaming para event sourcing de Vessel con Id: " + this.serviceId); init(); Loading @@ -87,9 +80,6 @@ public class VesselEventStreams extends EventSourcingStreams { vesselType = builder.globalTable(vesselTypeTopic); vesselTypeEvents = builder.stream(vesselTypeUpdatedTopic); aggByVessel = builder.table(vesselTrackingAggByVesselTopic, Consumed.with(Serdes.String(), hashMapSerdeAggregationVesselInVesselTracking)); } /** Loading Loading @@ -228,13 +218,19 @@ public class VesselEventStreams extends EventSourcingStreams { KStream<String, Event> deleteEvents = events .filter((id, event) -> (EventTypes.CHECK_DELETE.equals(event.getType()))); deleteEvents // TODO: Buscar la manera de comprobar si el barco está en algún track deleteEvents.map( (key, value) -> KeyValue.pair(key, VesselEventFactory.getEvent(value, VesselEventTypes.DELETE_CHECKED))) .to(topic); ; /*-deleteEvents .leftJoin(aggByVessel, (deleteEvent, vesselTrackingAggByVessel) -> getCheckDeleteResultEvent(deleteEvent, vesselTrackingAggByVessel)) .to(topic); .to(topic);-*/ } @SuppressWarnings("serial") /*-@SuppressWarnings("serial") private Event getCheckDeleteResultEvent(Event deleteEvent, HashMap<String, AggregationVesselInVesselTrackingPostUpdateEvent> vesselTrackingAggByVessel) { Loading @@ -250,7 +246,7 @@ public class VesselEventStreams extends EventSourcingStreams { } }); } } }-*/ /** * Función que a partir del último evento correcto + el evento de edición Loading vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselTrackingEventStreams.java +5 −119 Original line number Diff line number Diff line package es.redmic.vesselscommands.streams; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import es.redmic.brokerlib.alert.AlertService; import es.redmic.brokerlib.avro.common.Event; import es.redmic.brokerlib.avro.common.EventError; import es.redmic.brokerlib.avro.common.EventTypes; import es.redmic.brokerlib.avro.serde.hashmap.HashMapSerde; import es.redmic.commandslib.streaming.common.StreamConfig; import es.redmic.commandslib.streaming.streams.EventSourcingStreams; import es.redmic.vesselslib.dto.tracking.VesselTrackingDTO; import es.redmic.vesselslib.dto.vessel.VesselDTO; import es.redmic.vesselslib.events.vessel.VesselEventTypes; import es.redmic.vesselslib.events.vessel.common.VesselEvent; import es.redmic.vesselslib.events.vesseltracking.VesselTrackingEventFactory; import es.redmic.vesselslib.events.vesseltracking.VesselTrackingEventTypes; import es.redmic.vesselslib.events.vesseltracking.common.VesselTrackingEvent; import es.redmic.vesselslib.events.vesseltracking.create.CreateVesselTrackingEnrichedEvent; import es.redmic.vesselslib.events.vesseltracking.partialupdate.vessel.AggregationVesselInVesselTrackingPostUpdateEvent; import es.redmic.vesselslib.events.vesseltracking.partialupdate.vessel.UpdateVesselInVesselTrackingEvent; import es.redmic.vesselslib.events.vesseltracking.update.UpdateVesselTrackingEnrichedEvent; Loading @@ -36,32 +25,17 @@ public class VesselTrackingEventStreams extends EventSourcingStreams { private String vesselTopic; private String vesselTrackingAggByVesselTopic; private String vesselUpdatedTopic; private HashMapSerde<String, AggregationVesselInVesselTrackingPostUpdateEvent> hashMapSerde; private GlobalKTable<String, HashMap<String, AggregationVesselInVesselTrackingPostUpdateEvent>> aggByVessel; private GlobalKTable<String, Event> vessel; private KStream<String, Event> vesselEvents; public VesselTrackingEventStreams(StreamConfig config, String vesselTopic, String vesselTrackingAggByVesselTopic, String vesselUpdatedTopic, AlertService alertService) { public VesselTrackingEventStreams(StreamConfig config, String vesselTopic, AlertService alertService) { super(config, alertService); this.vesselTopic = vesselTopic + snapshotTopicSuffix; this.vesselTrackingAggByVesselTopic = vesselTrackingAggByVesselTopic; this.vesselUpdatedTopic = vesselUpdatedTopic; this.hashMapSerde = new HashMapSerde<String, AggregationVesselInVesselTrackingPostUpdateEvent>(schemaRegistry); logger.info("Arrancado servicio de streaming para event sourcing de Vessel tracking con Id: " + this.serviceId); init(); } /** * Crea GlobalKTable de vesselTracking agregados por vessel * * @see es.redmic.commandslib.streaming.streams.EventSourcingStreams# * createExtraStreams() Loading @@ -69,13 +43,7 @@ public class VesselTrackingEventStreams extends EventSourcingStreams { @Override protected void createExtraStreams() { // Crea un store global para procesar los datos de todas las instancias de // vesselTracking agregados por vessel aggByVessel = builder.globalTable(vesselTrackingAggByVesselTopic, Consumed.with(Serdes.String(), hashMapSerde)); vessel = builder.globalTable(vesselTopic); vesselEvents = builder.stream(vesselUpdatedTopic); } /** Loading Loading @@ -314,95 +282,13 @@ public class VesselTrackingEventStreams extends EventSourcingStreams { vesselTracking, eventError.getExceptionType(), eventError.getArguments()); } /** * Función para procesar modificaciones de referencias */ @Override protected void processPostUpdateStream(KStream<String, Event> vesselTrackingEvents) { KStream<String, Event> vesselTrackingEventsStream = vesselTrackingEvents.filter((id, event) -> { return (event instanceof VesselTrackingEvent); }).selectKey((k, v) -> getVesselIdFromVesselTracking(v)); // Para cada una de las referencias // Agregar por vesseltype aggregateVesselTrackingByVessel(vesselTrackingEventsStream); // processar los vessel modificados processVesselPostUpdate(); } private String getVesselIdFromVesselTracking(Event evt) { public static String getVesselIdFromVesselTracking(Event evt) { return ((VesselTrackingEvent) evt).getVesselTracking().getProperties().getVessel().getId(); } private void aggregateVesselTrackingByVessel(KStream<String, Event> vesselTrackingEventsStream) { vesselTrackingEventsStream.groupByKey() .aggregate(HashMap<String, AggregationVesselInVesselTrackingPostUpdateEvent>::new, (k, v, map) -> aggregateVesselTrackingByVessel(k, v, map), Materialized.with(Serdes.String(), hashMapSerde)) .toStream().to(vesselTrackingAggByVesselTopic, Produced.with(Serdes.String(), hashMapSerde)); } private void processVesselPostUpdate() { KStream<String, ArrayList<UpdateVesselInVesselTrackingEvent>> join = vesselEvents.join(aggByVessel, (k, v) -> k, (updateReferenceEvent, vesselTrackingWithReferenceEvents) -> getPostUpdateEvent(updateReferenceEvent, vesselTrackingWithReferenceEvents)); // desagregar, cambiar clave por la de vesselTracking y enviar a topic join.flatMapValues(value -> value).selectKey((k, v) -> v.getAggregateId()).to(topic); } private HashMap<String, AggregationVesselInVesselTrackingPostUpdateEvent> aggregateVesselTrackingByVessel( String key, Event value, HashMap<String, AggregationVesselInVesselTrackingPostUpdateEvent> hashMap) { VesselDTO vessel = ((VesselTrackingEvent) value).getVesselTracking().getProperties().getVessel(); if (vessel != null) { hashMap.put(value.getAggregateId(), new AggregationVesselInVesselTrackingPostUpdateEvent(value.getType(), vessel).buildFrom(value)); } return hashMap; } private ArrayList<UpdateVesselInVesselTrackingEvent> getPostUpdateEvent(Event updateReferenceEvent, HashMap<String, AggregationVesselInVesselTrackingPostUpdateEvent> vesselTrackingWithReferenceEvents) { ArrayList<UpdateVesselInVesselTrackingEvent> result = new ArrayList<>(); VesselDTO vessel = ((VesselEvent) updateReferenceEvent).getVessel(); for (Map.Entry<String, AggregationVesselInVesselTrackingPostUpdateEvent> entry : vesselTrackingWithReferenceEvents .entrySet()) { AggregationVesselInVesselTrackingPostUpdateEvent aggregationEvent = entry.getValue(); if (VesselTrackingEventTypes.isLocked(aggregationEvent.getType())) { if (!aggregationEvent.getType().equals(VesselTrackingEventTypes.DELETED)) { String message = "Item con id " + aggregationEvent.getAggregateId() + " se encuentra en mitad de un ciclo de creación o edición, por lo que no se modificó la referencia " + updateReferenceEvent.getAggregateId(); logger.info(message); alertService.errorAlert(aggregationEvent.getAggregateId(), message); } } else if (!aggregationEvent.getVessel().equals(vessel)) { result.add((UpdateVesselInVesselTrackingEvent) VesselTrackingEventFactory.getEvent(aggregationEvent, updateReferenceEvent, VesselTrackingEventTypes.UPDATE_VESSEL)); } else { logger.debug("Vessel ya estaba actualizado o los campos indexados no han cambiado "); } } return result; @Override protected void processPostUpdateStream(KStream<String, Event> events) { // En series temporales no se hace postUpdate } } vessels-commands/src/main/resources/application.properties +0 −6 Original line number Diff line number Diff line Loading @@ -84,12 +84,6 @@ broker.topic.vessel-tracking=vessel-tracking #topic del broker para enviar/recibir eventos de barcos broker.topic.vessel=vessel # topic para aggregar tracking por vessel broker.topic.tracking.agg.by.vessel=tracking-agg-by-vessel #topic para enviar eventos de vessel modificados para postupdate broker.topic.vessel.updated=vessel-updated #topic del broker para enviar/recibir eventos de tipos de barcos broker.topic.vessel-type=vessel-type Loading Loading
vessels-commands/src/main/java/es/redmic/vesselscommands/handler/VesselCommandHandler.java +2 −8 Original line number Diff line number Diff line Loading @@ -53,12 +53,6 @@ public class VesselCommandHandler extends CommandHandler { @Value("${broker.topic.vessel}") private String vesselTopic; @Value("${broker.topic.vessel.updated}") private String vesselUpdatedTopic; @Value("${broker.topic.tracking.agg.by.vessel}") private String vesselTrackingAggByVesselTopic; @Value("${broker.topic.vessel.type.updated}") private String vesselTypeUpdatedTopic; Loading Loading @@ -117,7 +111,7 @@ public class VesselCommandHandler extends CommandHandler { .serviceId(vesselsEventsStreamId) .windowsTime(streamWindowsTime) .build(), vesselTypeTopic, vesselsAggByVesselTypeTopic, vesselTypeUpdatedTopic, vesselTrackingAggByVesselTopic, alertService); vesselTypeUpdatedTopic, alertService); // @formatter:on } Loading Loading @@ -236,7 +230,7 @@ public class VesselCommandHandler extends CommandHandler { // Envía los editados satisfactoriamente para tenerlos en cuenta en el // postupdate publishToKafka(event, vesselUpdatedTopic); // publishToKafka(event, vesselUpdatedTopic); // El evento Modificado se envió desde el stream Loading
vessels-commands/src/main/java/es/redmic/vesselscommands/handler/VesselTrackingCommandHandler.java +1 −7 Original line number Diff line number Diff line Loading @@ -51,12 +51,6 @@ public class VesselTrackingCommandHandler extends CommandHandler { @Value("${broker.topic.vessel-tracking}") private String vesselTrackingTopic; @Value("${broker.topic.vessel.updated}") private String vesselUpdatedTopic; @Value("${broker.topic.tracking.agg.by.vessel}") private String vesselTrackingAggByVesselTopic; @Value("${broker.state.store.vesseltracking.dir}") private String stateStoreVesselTrackingDir; Loading Loading @@ -108,7 +102,7 @@ public class VesselTrackingCommandHandler extends CommandHandler { config .serviceId(vesselTrackingEventsStreamId) .windowsTime(streamWindowsTime) .build(), vesselTopic, vesselTrackingAggByVesselTopic, vesselUpdatedTopic, alertService); .build(), vesselTopic, alertService); // @formatter:on } Loading
vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselEventStreams.java +16 −20 Original line number Diff line number Diff line Loading @@ -5,6 +5,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.JoinWindows; Loading @@ -18,7 +19,6 @@ import es.redmic.brokerlib.avro.common.Event; import es.redmic.brokerlib.avro.common.EventError; import es.redmic.brokerlib.avro.common.EventTypes; import es.redmic.brokerlib.avro.serde.hashmap.HashMapSerde; import es.redmic.commandslib.exceptions.ExceptionType; import es.redmic.commandslib.streaming.common.StreamConfig; import es.redmic.commandslib.streaming.streams.EventSourcingStreams; import es.redmic.vesselslib.dto.vessel.VesselDTO; Loading @@ -30,7 +30,6 @@ import es.redmic.vesselslib.events.vessel.create.CreateVesselEnrichedEvent; import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.AggregationVesselTypeInVesselPostUpdateEvent; import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.UpdateVesselTypeInVesselEvent; import es.redmic.vesselslib.events.vessel.update.UpdateVesselEnrichedEvent; import es.redmic.vesselslib.events.vesseltracking.partialupdate.vessel.AggregationVesselInVesselTrackingPostUpdateEvent; import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes; import es.redmic.vesselslib.events.vesseltype.common.VesselTypeEvent; Loading @@ -42,7 +41,7 @@ public class VesselEventStreams extends EventSourcingStreams { private String vesselTypeUpdatedTopic; private String vesselTrackingAggByVesselTopic; // private String vesselTrackingAggByVesselTopic; private HashMapSerde<String, AggregationVesselTypeInVesselPostUpdateEvent> hashMapSerdeAggregationVesselTypeInVessel; Loading @@ -52,19 +51,13 @@ public class VesselEventStreams extends EventSourcingStreams { private KStream<String, Event> vesselTypeEvents; private HashMapSerde<String, AggregationVesselInVesselTrackingPostUpdateEvent> hashMapSerdeAggregationVesselInVesselTracking; private KTable<String, HashMap<String, AggregationVesselInVesselTrackingPostUpdateEvent>> aggByVessel; public VesselEventStreams(StreamConfig config, String vesselTypeTopic, String vesselsAggByVesselTypeTopic, String vesselTypeUpdatedTopic, String vesselTrackingAggByVesselTopic, AlertService alertService) { String vesselTypeUpdatedTopic, AlertService alertService) { super(config, alertService); this.vesselTypeTopic = vesselTypeTopic + snapshotTopicSuffix; this.vesselsAggByVesselTypeTopic = vesselsAggByVesselTypeTopic; this.vesselTypeUpdatedTopic = vesselTypeUpdatedTopic; this.vesselTrackingAggByVesselTopic = vesselTrackingAggByVesselTopic; this.hashMapSerdeAggregationVesselTypeInVessel = new HashMapSerde<>(schemaRegistry); this.hashMapSerdeAggregationVesselInVesselTracking = new HashMapSerde<>(schemaRegistry); logger.info("Arrancado servicio de streaming para event sourcing de Vessel con Id: " + this.serviceId); init(); Loading @@ -87,9 +80,6 @@ public class VesselEventStreams extends EventSourcingStreams { vesselType = builder.globalTable(vesselTypeTopic); vesselTypeEvents = builder.stream(vesselTypeUpdatedTopic); aggByVessel = builder.table(vesselTrackingAggByVesselTopic, Consumed.with(Serdes.String(), hashMapSerdeAggregationVesselInVesselTracking)); } /** Loading Loading @@ -228,13 +218,19 @@ public class VesselEventStreams extends EventSourcingStreams { KStream<String, Event> deleteEvents = events .filter((id, event) -> (EventTypes.CHECK_DELETE.equals(event.getType()))); deleteEvents // TODO: Buscar la manera de comprobar si el barco está en algún track deleteEvents.map( (key, value) -> KeyValue.pair(key, VesselEventFactory.getEvent(value, VesselEventTypes.DELETE_CHECKED))) .to(topic); ; /*-deleteEvents .leftJoin(aggByVessel, (deleteEvent, vesselTrackingAggByVessel) -> getCheckDeleteResultEvent(deleteEvent, vesselTrackingAggByVessel)) .to(topic); .to(topic);-*/ } @SuppressWarnings("serial") /*-@SuppressWarnings("serial") private Event getCheckDeleteResultEvent(Event deleteEvent, HashMap<String, AggregationVesselInVesselTrackingPostUpdateEvent> vesselTrackingAggByVessel) { Loading @@ -250,7 +246,7 @@ public class VesselEventStreams extends EventSourcingStreams { } }); } } }-*/ /** * Función que a partir del último evento correcto + el evento de edición Loading
vessels-commands/src/main/java/es/redmic/vesselscommands/streams/VesselTrackingEventStreams.java +5 −119 Original line number Diff line number Diff line package es.redmic.vesselscommands.streams; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import es.redmic.brokerlib.alert.AlertService; import es.redmic.brokerlib.avro.common.Event; import es.redmic.brokerlib.avro.common.EventError; import es.redmic.brokerlib.avro.common.EventTypes; import es.redmic.brokerlib.avro.serde.hashmap.HashMapSerde; import es.redmic.commandslib.streaming.common.StreamConfig; import es.redmic.commandslib.streaming.streams.EventSourcingStreams; import es.redmic.vesselslib.dto.tracking.VesselTrackingDTO; import es.redmic.vesselslib.dto.vessel.VesselDTO; import es.redmic.vesselslib.events.vessel.VesselEventTypes; import es.redmic.vesselslib.events.vessel.common.VesselEvent; import es.redmic.vesselslib.events.vesseltracking.VesselTrackingEventFactory; import es.redmic.vesselslib.events.vesseltracking.VesselTrackingEventTypes; import es.redmic.vesselslib.events.vesseltracking.common.VesselTrackingEvent; import es.redmic.vesselslib.events.vesseltracking.create.CreateVesselTrackingEnrichedEvent; import es.redmic.vesselslib.events.vesseltracking.partialupdate.vessel.AggregationVesselInVesselTrackingPostUpdateEvent; import es.redmic.vesselslib.events.vesseltracking.partialupdate.vessel.UpdateVesselInVesselTrackingEvent; import es.redmic.vesselslib.events.vesseltracking.update.UpdateVesselTrackingEnrichedEvent; Loading @@ -36,32 +25,17 @@ public class VesselTrackingEventStreams extends EventSourcingStreams { private String vesselTopic; private String vesselTrackingAggByVesselTopic; private String vesselUpdatedTopic; private HashMapSerde<String, AggregationVesselInVesselTrackingPostUpdateEvent> hashMapSerde; private GlobalKTable<String, HashMap<String, AggregationVesselInVesselTrackingPostUpdateEvent>> aggByVessel; private GlobalKTable<String, Event> vessel; private KStream<String, Event> vesselEvents; public VesselTrackingEventStreams(StreamConfig config, String vesselTopic, String vesselTrackingAggByVesselTopic, String vesselUpdatedTopic, AlertService alertService) { public VesselTrackingEventStreams(StreamConfig config, String vesselTopic, AlertService alertService) { super(config, alertService); this.vesselTopic = vesselTopic + snapshotTopicSuffix; this.vesselTrackingAggByVesselTopic = vesselTrackingAggByVesselTopic; this.vesselUpdatedTopic = vesselUpdatedTopic; this.hashMapSerde = new HashMapSerde<String, AggregationVesselInVesselTrackingPostUpdateEvent>(schemaRegistry); logger.info("Arrancado servicio de streaming para event sourcing de Vessel tracking con Id: " + this.serviceId); init(); } /** * Crea GlobalKTable de vesselTracking agregados por vessel * * @see es.redmic.commandslib.streaming.streams.EventSourcingStreams# * createExtraStreams() Loading @@ -69,13 +43,7 @@ public class VesselTrackingEventStreams extends EventSourcingStreams { @Override protected void createExtraStreams() { // Crea un store global para procesar los datos de todas las instancias de // vesselTracking agregados por vessel aggByVessel = builder.globalTable(vesselTrackingAggByVesselTopic, Consumed.with(Serdes.String(), hashMapSerde)); vessel = builder.globalTable(vesselTopic); vesselEvents = builder.stream(vesselUpdatedTopic); } /** Loading Loading @@ -314,95 +282,13 @@ public class VesselTrackingEventStreams extends EventSourcingStreams { vesselTracking, eventError.getExceptionType(), eventError.getArguments()); } /** * Función para procesar modificaciones de referencias */ @Override protected void processPostUpdateStream(KStream<String, Event> vesselTrackingEvents) { KStream<String, Event> vesselTrackingEventsStream = vesselTrackingEvents.filter((id, event) -> { return (event instanceof VesselTrackingEvent); }).selectKey((k, v) -> getVesselIdFromVesselTracking(v)); // Para cada una de las referencias // Agregar por vesseltype aggregateVesselTrackingByVessel(vesselTrackingEventsStream); // processar los vessel modificados processVesselPostUpdate(); } private String getVesselIdFromVesselTracking(Event evt) { public static String getVesselIdFromVesselTracking(Event evt) { return ((VesselTrackingEvent) evt).getVesselTracking().getProperties().getVessel().getId(); } private void aggregateVesselTrackingByVessel(KStream<String, Event> vesselTrackingEventsStream) { vesselTrackingEventsStream.groupByKey() .aggregate(HashMap<String, AggregationVesselInVesselTrackingPostUpdateEvent>::new, (k, v, map) -> aggregateVesselTrackingByVessel(k, v, map), Materialized.with(Serdes.String(), hashMapSerde)) .toStream().to(vesselTrackingAggByVesselTopic, Produced.with(Serdes.String(), hashMapSerde)); } private void processVesselPostUpdate() { KStream<String, ArrayList<UpdateVesselInVesselTrackingEvent>> join = vesselEvents.join(aggByVessel, (k, v) -> k, (updateReferenceEvent, vesselTrackingWithReferenceEvents) -> getPostUpdateEvent(updateReferenceEvent, vesselTrackingWithReferenceEvents)); // desagregar, cambiar clave por la de vesselTracking y enviar a topic join.flatMapValues(value -> value).selectKey((k, v) -> v.getAggregateId()).to(topic); } private HashMap<String, AggregationVesselInVesselTrackingPostUpdateEvent> aggregateVesselTrackingByVessel( String key, Event value, HashMap<String, AggregationVesselInVesselTrackingPostUpdateEvent> hashMap) { VesselDTO vessel = ((VesselTrackingEvent) value).getVesselTracking().getProperties().getVessel(); if (vessel != null) { hashMap.put(value.getAggregateId(), new AggregationVesselInVesselTrackingPostUpdateEvent(value.getType(), vessel).buildFrom(value)); } return hashMap; } private ArrayList<UpdateVesselInVesselTrackingEvent> getPostUpdateEvent(Event updateReferenceEvent, HashMap<String, AggregationVesselInVesselTrackingPostUpdateEvent> vesselTrackingWithReferenceEvents) { ArrayList<UpdateVesselInVesselTrackingEvent> result = new ArrayList<>(); VesselDTO vessel = ((VesselEvent) updateReferenceEvent).getVessel(); for (Map.Entry<String, AggregationVesselInVesselTrackingPostUpdateEvent> entry : vesselTrackingWithReferenceEvents .entrySet()) { AggregationVesselInVesselTrackingPostUpdateEvent aggregationEvent = entry.getValue(); if (VesselTrackingEventTypes.isLocked(aggregationEvent.getType())) { if (!aggregationEvent.getType().equals(VesselTrackingEventTypes.DELETED)) { String message = "Item con id " + aggregationEvent.getAggregateId() + " se encuentra en mitad de un ciclo de creación o edición, por lo que no se modificó la referencia " + updateReferenceEvent.getAggregateId(); logger.info(message); alertService.errorAlert(aggregationEvent.getAggregateId(), message); } } else if (!aggregationEvent.getVessel().equals(vessel)) { result.add((UpdateVesselInVesselTrackingEvent) VesselTrackingEventFactory.getEvent(aggregationEvent, updateReferenceEvent, VesselTrackingEventTypes.UPDATE_VESSEL)); } else { logger.debug("Vessel ya estaba actualizado o los campos indexados no han cambiado "); } } return result; @Override protected void processPostUpdateStream(KStream<String, Event> events) { // En series temporales no se hace postUpdate } }
vessels-commands/src/main/resources/application.properties +0 −6 Original line number Diff line number Diff line Loading @@ -84,12 +84,6 @@ broker.topic.vessel-tracking=vessel-tracking #topic del broker para enviar/recibir eventos de barcos broker.topic.vessel=vessel # topic para aggregar tracking por vessel broker.topic.tracking.agg.by.vessel=tracking-agg-by-vessel #topic para enviar eventos de vessel modificados para postupdate broker.topic.vessel.updated=vessel-updated #topic del broker para enviar/recibir eventos de tipos de barcos broker.topic.vessel-type=vessel-type Loading