Commit 13c2870f authored by Noel Alonso's avatar Noel Alonso
Browse files

Se pasa a base alertService para enviar mensajes

parent 3c636629
Loading
Loading
Loading
Loading
+3 −2
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@ import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

import es.redmic.brokerlib.alert.AlertService;
import es.redmic.brokerlib.avro.common.Event;
import es.redmic.commandslib.streams.BaseStreams;

@@ -15,8 +16,8 @@ public abstract class StateStore extends BaseStreams {

	protected ReadOnlyKeyValueStore<String, Event> store;

	public StateStore(StreamConfig config) {
		super(config);
	public StateStore(StreamConfig config, AlertService alertService) {
		super(config, alertService);
	}

	@Override
+9 −3
Original line number Diff line number Diff line
@@ -4,6 +4,7 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import es.redmic.brokerlib.alert.AlertService;
import es.redmic.commandslib.statestore.StreamConfig;

public abstract class BaseStreams {
@@ -26,13 +27,16 @@ public abstract class BaseStreams {

	protected KafkaStreams streams;

	public BaseStreams(StreamConfig config) {
	protected AlertService alertService;

	public BaseStreams(StreamConfig config, AlertService alertService) {
		this.topic = config.getTopic();
		this.stateStoreDir = config.getStateStoreDir();
		this.serviceId = config.getServiceId();
		this.bootstrapServers = config.getBootstrapServers();
		this.schemaRegistry = config.getSchemaRegistry();
		this.windowsTime = config.getWindowsTime();
		this.alertService = alertService;
	}

	protected void init() {
@@ -66,8 +70,10 @@ public abstract class BaseStreams {
	}

	private void uncaughtException(Thread thread, Throwable throwable) {
		// TODO: Mandar alerta
		logger.error("Error no conocido en kafka stream");

		String msg = "Error no conocido en kafka stream. El stream dejará de funcionar";
		logger.error(msg);
		throwable.printStackTrace();
		alertService.errorAlert(this.topic, msg);
	}
}
+7 −3
Original line number Diff line number Diff line
@@ -4,6 +4,7 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;

import es.redmic.brokerlib.alert.AlertService;
import es.redmic.brokerlib.avro.common.Event;
import es.redmic.commandslib.statestore.StreamConfig;
import es.redmic.commandslib.statestore.StreamUtils;
@@ -12,8 +13,8 @@ public abstract class EventStreams extends BaseStreams {

	protected StreamsBuilder builder = new StreamsBuilder();

	public EventStreams(StreamConfig config) {
		super(config);
	public EventStreams(StreamConfig config, AlertService alertService) {
		super(config, alertService);
	}

	@Override
@@ -48,7 +49,10 @@ public abstract class EventStreams extends BaseStreams {
	protected boolean isSameSession(Event a, Event b) {

		if (!(a.getSessionId().equals(b.getSessionId()))) {
			logger.error("Se esperaba eventos con el mismo id de sesión");
			String message = "Recibido evento de petición con id de sessión diferente al evento de confirmación para item "
					+ a.getAggregateId();
			logger.error(message);
			alertService.errorAlert(a.getAggregateId(), message);
			return false;
		}
		return true;