Loading atlas-commands/src/main/java/es/redmic/atlascommands/streams/CategoryEventStreams.java +0 −9 Original line number Diff line number Diff line Loading @@ -175,15 +175,6 @@ public class CategoryEventStreams extends EventSourcingStreams { // En este caso no hay modificaciones parciales } /** * Función para procesar modificaciones de referencias */ @Override protected void processPostUpdateStream(KStream<String, Event> events) { // En este caso no hay modificación de relaciones } @Override protected void processExtraStreams(KStream<String, Event> events, KStream<String, Event> snapshotEvents) { } Loading atlas-commands/src/main/java/es/redmic/atlascommands/streams/ThemeInspireEventStreams.java +1 −55 Original line number Diff line number Diff line Loading @@ -34,25 +34,12 @@ import es.redmic.commandslib.streaming.streams.EventSourcingStreams; public class ThemeInspireEventStreams extends EventSourcingStreams { // private String atlasAggByThemeInspireTopic; // private HashMapSerde<String, AggregationThemeInspireInAtlasPostUpdateEvent> // hashMapSerde; // private KTable<String, HashMap<String, // AggregationThemeInspireInAtlasPostUpdateEvent>> aggByThemeInspire; public ThemeInspireEventStreams(StreamConfig config, String atlasAggByThemeInspireTopic, AlertService alertService) { public ThemeInspireEventStreams(StreamConfig config, AlertService alertService) { super(config, alertService); // this.atlasAggByThemeInspireTopic = atlasAggByThemeInspireTopic; // this.hashMapSerde = new HashMapSerde<>(schemaRegistry); init(); } /** * Crea KTable de atlas agregados por themeinspire * * @see es.redmic.commandslib.streaming.streams.EventSourcingStreams# * createExtraStreams() Loading @@ -60,8 +47,6 @@ public class ThemeInspireEventStreams extends EventSourcingStreams { @Override protected void createExtraStreams() { // aggByThemeInspire = builder.table(atlasAggByThemeInspireTopic, // Consumed.with(Serdes.String(), hashMapSerde)); } /** Loading Loading @@ -125,36 +110,6 @@ public class ThemeInspireEventStreams extends EventSourcingStreams { } /*-@Override protected void processDeleteStream(KStream<String, Event> events) { // Stream filtrado por eventos de borrado KStream<String, Event> deleteEvents = events .filter((id, event) -> (EventTypes.CHECK_DELETE.equals(event.getType()))); deleteEvents.leftJoin(aggByThemeInspire, (deleteEvent, atlasAggByThemeInspire) -> getCheckDeleteResultEvent(deleteEvent, atlasAggByThemeInspire)) .to(topic); } @SuppressWarnings("serial") private Event getCheckDeleteResultEvent(Event deleteEvent, HashMap<String, AggregationThemeInspireInAtlasPostUpdateEvent> atlasAggByThemeInspire) { if (atlasAggByThemeInspire == null || atlasAggByThemeInspire.isEmpty()) { // elemento no referenciado return ThemeInspireEventFactory.getEvent(deleteEvent, ThemeInspireEventTypes.DELETE_CHECKED); } else { // elemento referenciado return ThemeInspireEventFactory.getEvent(deleteEvent, ThemeInspireEventTypes.DELETE_CHECK_FAILED, ExceptionType.ITEM_REFERENCED.toString(), new HashMap<String, String>() { { put("id", deleteEvent.getAggregateId()); } }); } }-*/ /** * Función que a partir del evento fallido y el último evento correcto, genera * evento UpdateCancelled Loading Loading @@ -211,15 +166,6 @@ public class ThemeInspireEventStreams extends EventSourcingStreams { // En este caso no hay modificaciones parciales } /** * Función para procesar modificaciones de referencias */ @Override protected void processPostUpdateStream(KStream<String, Event> events) { // En este caso no hay modificación de relaciones } @Override protected void processExtraStreams(KStream<String, Event> events, KStream<String, Event> snapshotEvents) { } Loading Loading
atlas-commands/src/main/java/es/redmic/atlascommands/streams/CategoryEventStreams.java +0 −9 Original line number Diff line number Diff line Loading @@ -175,15 +175,6 @@ public class CategoryEventStreams extends EventSourcingStreams { // En este caso no hay modificaciones parciales } /** * Función para procesar modificaciones de referencias */ @Override protected void processPostUpdateStream(KStream<String, Event> events) { // En este caso no hay modificación de relaciones } @Override protected void processExtraStreams(KStream<String, Event> events, KStream<String, Event> snapshotEvents) { } Loading
atlas-commands/src/main/java/es/redmic/atlascommands/streams/ThemeInspireEventStreams.java +1 −55 Original line number Diff line number Diff line Loading @@ -34,25 +34,12 @@ import es.redmic.commandslib.streaming.streams.EventSourcingStreams; public class ThemeInspireEventStreams extends EventSourcingStreams { // private String atlasAggByThemeInspireTopic; // private HashMapSerde<String, AggregationThemeInspireInAtlasPostUpdateEvent> // hashMapSerde; // private KTable<String, HashMap<String, // AggregationThemeInspireInAtlasPostUpdateEvent>> aggByThemeInspire; public ThemeInspireEventStreams(StreamConfig config, String atlasAggByThemeInspireTopic, AlertService alertService) { public ThemeInspireEventStreams(StreamConfig config, AlertService alertService) { super(config, alertService); // this.atlasAggByThemeInspireTopic = atlasAggByThemeInspireTopic; // this.hashMapSerde = new HashMapSerde<>(schemaRegistry); init(); } /** * Crea KTable de atlas agregados por themeinspire * * @see es.redmic.commandslib.streaming.streams.EventSourcingStreams# * createExtraStreams() Loading @@ -60,8 +47,6 @@ public class ThemeInspireEventStreams extends EventSourcingStreams { @Override protected void createExtraStreams() { // aggByThemeInspire = builder.table(atlasAggByThemeInspireTopic, // Consumed.with(Serdes.String(), hashMapSerde)); } /** Loading Loading @@ -125,36 +110,6 @@ public class ThemeInspireEventStreams extends EventSourcingStreams { } /*-@Override protected void processDeleteStream(KStream<String, Event> events) { // Stream filtrado por eventos de borrado KStream<String, Event> deleteEvents = events .filter((id, event) -> (EventTypes.CHECK_DELETE.equals(event.getType()))); deleteEvents.leftJoin(aggByThemeInspire, (deleteEvent, atlasAggByThemeInspire) -> getCheckDeleteResultEvent(deleteEvent, atlasAggByThemeInspire)) .to(topic); } @SuppressWarnings("serial") private Event getCheckDeleteResultEvent(Event deleteEvent, HashMap<String, AggregationThemeInspireInAtlasPostUpdateEvent> atlasAggByThemeInspire) { if (atlasAggByThemeInspire == null || atlasAggByThemeInspire.isEmpty()) { // elemento no referenciado return ThemeInspireEventFactory.getEvent(deleteEvent, ThemeInspireEventTypes.DELETE_CHECKED); } else { // elemento referenciado return ThemeInspireEventFactory.getEvent(deleteEvent, ThemeInspireEventTypes.DELETE_CHECK_FAILED, ExceptionType.ITEM_REFERENCED.toString(), new HashMap<String, String>() { { put("id", deleteEvent.getAggregateId()); } }); } }-*/ /** * Función que a partir del evento fallido y el último evento correcto, genera * evento UpdateCancelled Loading Loading @@ -211,15 +166,6 @@ public class ThemeInspireEventStreams extends EventSourcingStreams { // En este caso no hay modificaciones parciales } /** * Función para procesar modificaciones de referencias */ @Override protected void processPostUpdateStream(KStream<String, Event> events) { // En este caso no hay modificación de relaciones } @Override protected void processExtraStreams(KStream<String, Event> events, KStream<String, Event> snapshotEvents) { } Loading