Loading src/main/java/es/redmic/commandslib/streaming/streams/EventSourcingStreams.java +10 −0 Original line number Diff line number Diff line Loading @@ -17,8 +17,13 @@ public abstract class EventSourcingStreams extends BaseStreams { protected StreamsBuilder builder = new StreamsBuilder(); protected String snapshotTopicSuffix = "-snapshot"; protected String snapshotTopic; public EventSourcingStreams(StreamConfig config, AlertService alertService) { super(config, alertService); snapshotTopic = topic + snapshotTopicSuffix; } @Override Loading @@ -28,6 +33,9 @@ public abstract class EventSourcingStreams extends BaseStreams { KStream<String, Event> events = builder.stream(topic); // Reenvia eventos snapshot al topic correspondiente forwardSnapshotEvents(events); // Realiza el enriquecimiento del item antes de crear processEnrichCreateSteam(events); Loading @@ -53,6 +61,8 @@ public abstract class EventSourcingStreams extends BaseStreams { StreamUtils.baseStreamsConfig(bootstrapServers, stateStoreDir, serviceId, schemaRegistry)); } protected abstract void forwardSnapshotEvents(KStream<String, Event> events); /* * Función para crear streams extra que sean necesarios y específicos de cada * tipo Loading Loading
src/main/java/es/redmic/commandslib/streaming/streams/EventSourcingStreams.java +10 −0 Original line number Diff line number Diff line Loading @@ -17,8 +17,13 @@ public abstract class EventSourcingStreams extends BaseStreams { protected StreamsBuilder builder = new StreamsBuilder(); protected String snapshotTopicSuffix = "-snapshot"; protected String snapshotTopic; public EventSourcingStreams(StreamConfig config, AlertService alertService) { super(config, alertService); snapshotTopic = topic + snapshotTopicSuffix; } @Override Loading @@ -28,6 +33,9 @@ public abstract class EventSourcingStreams extends BaseStreams { KStream<String, Event> events = builder.stream(topic); // Reenvia eventos snapshot al topic correspondiente forwardSnapshotEvents(events); // Realiza el enriquecimiento del item antes de crear processEnrichCreateSteam(events); Loading @@ -53,6 +61,8 @@ public abstract class EventSourcingStreams extends BaseStreams { StreamUtils.baseStreamsConfig(bootstrapServers, stateStoreDir, serviceId, schemaRegistry)); } protected abstract void forwardSnapshotEvents(KStream<String, Event> events); /* * Función para crear streams extra que sean necesarios y específicos de cada * tipo Loading