Loading src/main/java/es/redmic/commandslib/streaming/streams/EventSourcingStreams.java +6 −10 Original line number Diff line number Diff line Loading @@ -33,6 +33,8 @@ public abstract class EventSourcingStreams extends BaseStreams { KStream<String, Event> events = builder.stream(topic); KStream<String, Event> snapshotEvents = builder.stream(snapshotTopic); // Reenvia eventos snapshot al topic correspondiente forwardSnapshotEvents(events); Loading @@ -52,7 +54,7 @@ public abstract class EventSourcingStreams extends BaseStreams { processDeleteStream(events); // Failed change processFailedChangeStream(events); processFailedChangeStream(events, snapshotEvents); // PostUpdate processPostUpdateStream(events); Loading Loading @@ -161,19 +163,13 @@ public abstract class EventSourcingStreams extends BaseStreams { /* * Función que procesa los eventos fallidos */ protected void processFailedChangeStream(KStream<String, Event> events) { // Stream filtrado por eventos finales con el item dentro (solo el // último que se produzca por id) KStream<String, Event> successEvents = events.filter((id, event) -> isSnapshot(event.getType())); protected void processFailedChangeStream(KStream<String, Event> events, KStream<String, Event> snapshotEvents) { processUpdateFailedStream(events, successEvents); processUpdateFailedStream(events, snapshotEvents); processDeleteFailedStream(events, successEvents); processDeleteFailedStream(events, snapshotEvents); } protected abstract boolean isSnapshot(String eventType); /* * Función que a partir del último evento correcto y el evento fallido al * editar, envía evento de cancelación Loading Loading
src/main/java/es/redmic/commandslib/streaming/streams/EventSourcingStreams.java +6 −10 Original line number Diff line number Diff line Loading @@ -33,6 +33,8 @@ public abstract class EventSourcingStreams extends BaseStreams { KStream<String, Event> events = builder.stream(topic); KStream<String, Event> snapshotEvents = builder.stream(snapshotTopic); // Reenvia eventos snapshot al topic correspondiente forwardSnapshotEvents(events); Loading @@ -52,7 +54,7 @@ public abstract class EventSourcingStreams extends BaseStreams { processDeleteStream(events); // Failed change processFailedChangeStream(events); processFailedChangeStream(events, snapshotEvents); // PostUpdate processPostUpdateStream(events); Loading Loading @@ -161,19 +163,13 @@ public abstract class EventSourcingStreams extends BaseStreams { /* * Función que procesa los eventos fallidos */ protected void processFailedChangeStream(KStream<String, Event> events) { // Stream filtrado por eventos finales con el item dentro (solo el // último que se produzca por id) KStream<String, Event> successEvents = events.filter((id, event) -> isSnapshot(event.getType())); protected void processFailedChangeStream(KStream<String, Event> events, KStream<String, Event> snapshotEvents) { processUpdateFailedStream(events, successEvents); processUpdateFailedStream(events, snapshotEvents); processDeleteFailedStream(events, successEvents); processDeleteFailedStream(events, snapshotEvents); } protected abstract boolean isSnapshot(String eventType); /* * Función que a partir del último evento correcto y el evento fallido al * editar, envía evento de cancelación Loading