Commit 38da4ee0 authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade nuevas funcionalidades

Crea un cliente de schema registry

Añade función para obtener un serde genérico de avro
parent f9584365
Loading
Loading
Loading
Loading
+40 −14
Original line number Diff line number Diff line
package es.redmic.vesselrestrictionchecker.common;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;

import es.redmic.vesselrestrictionchecker.utils.StreamsApplicationUtils;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;

public abstract class StreamsApplicationBase {

	protected String schemaRegistryUrl;

	private SchemaRegistryClient schemaRegistryClient;

	public StreamsApplicationBase(String schemaRegistryUrl) {

		this.schemaRegistryUrl = schemaRegistryUrl;

		this.schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 100);
	}

	// @formatter:off

	protected static final String APP_ID = "APP_ID",
@@ -18,7 +36,7 @@ public abstract class StreamsApplicationBase {
	// @formatter:on

	@SuppressWarnings("serial")
	protected static HashMap<String, Object> requiredVariablesBase = new HashMap<String, Object>() {
	protected static HashMap<String, String> requiredVariablesBase = new HashMap<String, String>() {
		{
			put(APP_ID, "Stream application identifier");
			put(BOOTSTRAP_SERVERS, "Kafka servers");
@@ -26,16 +44,9 @@ public abstract class StreamsApplicationBase {
		}
	};

	protected static StreamsBuilder builder = new StreamsBuilder();

	protected static void startStream(String appId, String bootstrapServers, String schemaRegistry) {
	public void startStreams(Topology topology, Properties props) {

		System.out.format("Kafka streams starting...%n");
		System.out.format("BootstrapServers: %s, SchemaRegistry: %s, AppId: %s%n", bootstrapServers, schemaRegistry,
				appId);

		KafkaStreams streams = new KafkaStreams(builder.build(),
				StreamsApplicationUtils.getStreamConfig(appId, bootstrapServers, schemaRegistry));
		KafkaStreams streams = new KafkaStreams(topology, props);

		streams.setUncaughtExceptionHandler(
				(Thread thread, Throwable throwable) -> uncaughtException(thread, throwable, streams));
@@ -45,7 +56,13 @@ public abstract class StreamsApplicationBase {
		addShutdownHookAndBlock(streams);
	}

	protected static void addShutdownHookAndBlock(KafkaStreams streams) {
	public Properties getKafkaProperties(String appId, String bootstrapServers) {

		// Sobrescribir método o añadir aquí properties específicas si fuera necesario
		return StreamsApplicationUtils.getStreamConfig(appId, bootstrapServers, schemaRegistryUrl);
	}

	protected void addShutdownHookAndBlock(KafkaStreams streams) {

		Thread.currentThread().setUncaughtExceptionHandler((t, e) -> uncaughtException(t, e, streams));

@@ -58,14 +75,14 @@ public abstract class StreamsApplicationBase {
		}));
	}

	protected static void uncaughtException(Thread thread, Throwable throwable, KafkaStreams streams) {
	protected void uncaughtException(Thread thread, Throwable throwable, KafkaStreams streams) {

		System.err.println("Error. The stream will stop working " + throwable.getLocalizedMessage());
		throwable.printStackTrace();
		streams.close();
	}

	protected static Map<String, Object> getEnvVariables(HashMap<String, Object> variablesRequired) {
	protected static Map<String, Object> getEnvVariables(HashMap<String, String> variablesRequired) {

		Map<String, Object> envVariables = new HashMap<>();

@@ -83,4 +100,13 @@ public abstract class StreamsApplicationBase {

		return envVariables;
	}

	protected <T extends SpecificRecord> SpecificAvroSerde<T> getValueSerde() {

		final SpecificAvroSerde<T> valueSerde = new SpecificAvroSerde<>(schemaRegistryClient);
		valueSerde.configure(
				Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl),
				false);
		return valueSerde;
	}
}