Loading src/main/java/es/redmic/commandslib/streaming/streams/EventSourcingStreams.java +4 −3 Original line number Diff line number Diff line Loading @@ -153,16 +153,17 @@ public abstract class EventSourcingStreams extends BaseStreams { */ protected void processFailedChangeStream(KStream<String, Event> events) { // Stream filtrado por eventos de creaciones y modificaciones correctos (solo el // Stream filtrado por eventos finales con el item dentro (solo el // último que se produzca por id) KStream<String, Event> successEvents = events.filter((id, event) -> (EventTypes.CREATED.equals(event.getType()) || EventTypes.UPDATED.equals(event.getType()))); KStream<String, Event> successEvents = events.filter((id, event) -> isSnapshot(event.getType())); processUpdateFailedStream(events, successEvents); processDeleteFailedStream(events, successEvents); } protected abstract boolean isSnapshot(String eventType); /* * Función que a partir del último evento correcto y el evento fallido al * editar, envía evento de cancelación Loading Loading
src/main/java/es/redmic/commandslib/streaming/streams/EventSourcingStreams.java +4 −3 Original line number Diff line number Diff line Loading @@ -153,16 +153,17 @@ public abstract class EventSourcingStreams extends BaseStreams { */ protected void processFailedChangeStream(KStream<String, Event> events) { // Stream filtrado por eventos de creaciones y modificaciones correctos (solo el // Stream filtrado por eventos finales con el item dentro (solo el // último que se produzca por id) KStream<String, Event> successEvents = events.filter((id, event) -> (EventTypes.CREATED.equals(event.getType()) || EventTypes.UPDATED.equals(event.getType()))); KStream<String, Event> successEvents = events.filter((id, event) -> isSnapshot(event.getType())); processUpdateFailedStream(events, successEvents); processDeleteFailedStream(events, successEvents); } protected abstract boolean isSnapshot(String eventType); /* * Función que a partir del último evento correcto y el evento fallido al * editar, envía evento de cancelación Loading