Commit 7fbf1233 authored by Noel Alonso's avatar Noel Alonso
Browse files

Reubica clases base de streaming

parent bfc1fa59
Loading
Loading
Loading
Loading
+22 −2
Original line number Diff line number Diff line
package es.redmic.commandslib.streams;
package es.redmic.commandslib.streaming.common;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import es.redmic.brokerlib.alert.AlertService;
import es.redmic.commandslib.statestore.StreamConfig;

public abstract class BaseStreams {

@@ -76,4 +77,23 @@ public abstract class BaseStreams {
		throwable.printStackTrace();
		alertService.errorAlert(this.topic, msg);
	}

	/*
	 * En ocaciones el store se bloquea debido a operaciones de rebalanceo de kafka.
	 * Esta función permite esperar hasta que sea accesible.
	 */

	protected static <T> T waitUntilStoreIsQueryable(final String storeName,
			final QueryableStoreType<T> queryableStoreType, final KafkaStreams streams) {
		while (true) {
			try {
				return streams.store(storeName, queryableStoreType);
			} catch (InvalidStateStoreException ignored) {
				try {
					Thread.sleep(100);
				} catch (InterruptedException e) {
				}
			}
		}
	}
}
+1 −1
Original line number Diff line number Diff line
package es.redmic.commandslib.statestore;
package es.redmic.commandslib.streaming.common;

public class StreamConfig {

+1 −1
Original line number Diff line number Diff line
package es.redmic.commandslib.statestore;
package es.redmic.commandslib.streaming.common;

import java.util.Map;
import java.util.Properties;
+4 −23
Original line number Diff line number Diff line
package es.redmic.commandslib.statestore;
package es.redmic.commandslib.streaming.statestore;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

import es.redmic.brokerlib.alert.AlertService;
import es.redmic.brokerlib.avro.common.Event;
import es.redmic.commandslib.streams.BaseStreams;
import es.redmic.commandslib.streaming.common.BaseStreams;
import es.redmic.commandslib.streaming.common.StreamConfig;
import es.redmic.commandslib.streaming.common.StreamUtils;

public abstract class StateStore extends BaseStreams {

@@ -36,23 +36,4 @@ public abstract class StateStore extends BaseStreams {

		this.store = waitUntilStoreIsQueryable(topic, QueryableStoreTypes.<String, Event>keyValueStore(), streams);
	}

	/*
	 * En ocaciones el store se bloquea debido a operaciones de rebalanceo de kafka.
	 * Esta función permite esperar hasta que sea accesible.
	 */

	protected static <T> T waitUntilStoreIsQueryable(final String storeName,
			final QueryableStoreType<T> queryableStoreType, final KafkaStreams streams) {
		while (true) {
			try {
				return streams.store(storeName, queryableStoreType);
			} catch (InvalidStateStoreException ignored) {
				try {
					Thread.sleep(100);
				} catch (InterruptedException e) {
				}
			}
		}
	}
}
+6 −5
Original line number Diff line number Diff line
package es.redmic.commandslib.streams;
package es.redmic.commandslib.streaming.streams;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
@@ -6,14 +6,15 @@ import org.apache.kafka.streams.kstream.KStream;

import es.redmic.brokerlib.alert.AlertService;
import es.redmic.brokerlib.avro.common.Event;
import es.redmic.commandslib.statestore.StreamConfig;
import es.redmic.commandslib.statestore.StreamUtils;
import es.redmic.commandslib.streaming.common.BaseStreams;
import es.redmic.commandslib.streaming.common.StreamConfig;
import es.redmic.commandslib.streaming.common.StreamUtils;

public abstract class EventStreams extends BaseStreams {
public abstract class EventSourcingStreams extends BaseStreams {

	protected StreamsBuilder builder = new StreamsBuilder();

	public EventStreams(StreamConfig config, AlertService alertService) {
	public EventSourcingStreams(StreamConfig config, AlertService alertService) {
		super(config, alertService);
	}