Commit 83c57ed6 authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade procesamiento para eventos rollback

Pasa a los métodos de procesamiento la ktable de los snapshot en luegar
de realizarlo en cada uno de ellos.
parent 6b0a8c27
Loading
Loading
Loading
Loading
+29 −12
Original line number Diff line number Diff line
@@ -54,6 +54,9 @@ public abstract class EventSourcingStreams extends BaseStreams {

		KStream<String, Event> snapshotEvents = builder.stream(snapshotTopic);

		KTable<String, Event> snapshotEventsTable = snapshotEvents.groupByKey()
				.reduce((aggValue, newValue) -> newValue);

		// Reenvia eventos snapshot al topic correspondiente
		forwardSnapshotEvents(events);

@@ -73,10 +76,13 @@ public abstract class EventSourcingStreams extends BaseStreams {
		processDeleteStream(events);

		// Failed change
		processFailedChangeStream(events, snapshotEvents);
		processFailedChangeStream(events, snapshotEventsTable);

		// extra process
		processExtraStreams(events, snapshotEvents);
		processExtraStreams(events, snapshotEventsTable);

		// Rollback
		proccessRollbackStream(events, snapshotEventsTable);

		return new KafkaStreams(builder.build(),
				StreamUtils.baseStreamsConfig(bootstrapServers, stateStoreDir, serviceId, schemaRegistry));
@@ -188,11 +194,11 @@ public abstract class EventSourcingStreams extends BaseStreams {
	/*
	 * Función que procesa los eventos fallidos
	 */
	protected void processFailedChangeStream(KStream<String, Event> events, KStream<String, Event> snapshotEvents) {
	protected void processFailedChangeStream(KStream<String, Event> events, KTable<String, Event> successEventsTable) {

		processUpdateFailedStream(events, snapshotEvents);
		processUpdateFailedStream(events, successEventsTable);

		processDeleteFailedStream(events, snapshotEvents);
		processDeleteFailedStream(events, successEventsTable);
	}

	/*
@@ -200,14 +206,12 @@ public abstract class EventSourcingStreams extends BaseStreams {
	 * editar, envía evento de cancelación
	 */

	protected void processUpdateFailedStream(KStream<String, Event> events, KStream<String, Event> successEvents) {
	protected void processUpdateFailedStream(KStream<String, Event> events, KTable<String, Event> successEventsTable) {

		// Stream filtrado por eventos de fallo al modificar
		KStream<String, Event> failedEvents = events
				.filter((id, event) -> (EventTypes.UPDATE_FAILED.equals(event.getType())));

		KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de compensación
		failedEvents
				.join(successEventsTable,
@@ -227,14 +231,12 @@ public abstract class EventSourcingStreams extends BaseStreams {
	 * eliminar, envía evento de cancelación
	 */

	protected void processDeleteFailedStream(KStream<String, Event> events, KStream<String, Event> successEvents) {
	protected void processDeleteFailedStream(KStream<String, Event> events, KTable<String, Event> successEventsTable) {

		// Stream filtrado por eventos de fallo al borrar
		KStream<String, Event> failedEvents = events
				.filter((id, event) -> (EventTypes.DELETE_FAILED.equals(event.getType())));

		KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de compensación
		failedEvents
				.join(successEventsTable,
@@ -257,5 +259,20 @@ public abstract class EventSourcingStreams extends BaseStreams {
	 * Función para procesar eventos a partir del stream principal y/o snapshot.
	 * Dependerá del servicio que lo implemente
	 */
	protected abstract void processExtraStreams(KStream<String, Event> events, KStream<String, Event> snapshotEvents);
	protected abstract void processExtraStreams(KStream<String, Event> events,
			KTable<String, Event> successEventsTable);

	private void proccessRollbackStream(KStream<String, Event> events, KTable<String, Event> successEventsTable) {

		// Stream filtrado por eventos pre rollback
		KStream<String, Event> prepareRollbackEvents = events
				.filter((id, event) -> (EventTypes.PREPARE_ROLLBACK.equals(event.getType())));

		// Join por id, mandando a kafka el evento específico
		prepareRollbackEvents.join(successEventsTable,
				(prepareRollbackEvent, lastSuccessEvent) -> getRollbackEvent(prepareRollbackEvent, lastSuccessEvent))
				.to(topic);
	}

	protected abstract Event getRollbackEvent(Event prepareRollbackEvent, Event lastSuccessEvent);
}