Commit 9ad00484 authored by Noel Alonso's avatar Noel Alonso
Browse files

Cambia constructor añadiendo SchemaRegistryClient

De esta manera es mucho más flexible, pudiendo cambiar el tipo de cliente
dependiendo del entorno u otras necesidades

Añade método para obtener serdes de tipo específico y genérico
parent fbf318e2
Loading
Loading
Loading
Loading
+17 −8
Original line number Diff line number Diff line
@@ -10,22 +10,22 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;

import es.redmic.vesselrestrictionchecker.utils.StreamsApplicationUtils;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;

public abstract class StreamsApplicationBase {

	protected String schemaRegistryUrl;

	private SchemaRegistryClient schemaRegistryClient;
	protected SchemaRegistryClient schemaRegistryClient;

	public StreamsApplicationBase(String schemaRegistryUrl) {
	public StreamsApplicationBase(SchemaRegistryClient schemaRegistryClient, String schemaRegistryUrl) {

		this.schemaRegistryUrl = schemaRegistryUrl;

		this.schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 100);
		this.schemaRegistryClient = schemaRegistryClient;
	}

	// @formatter:off
@@ -101,12 +101,21 @@ public abstract class StreamsApplicationBase {
		return envVariables;
	}

	protected <T extends SpecificRecord> SpecificAvroSerde<T> getValueSerde() {
	protected <T extends SpecificRecord> SpecificAvroSerde<T> getSpecificAvroSerde() {

		final SpecificAvroSerde<T> valueSerde = new SpecificAvroSerde<>(schemaRegistryClient);
		valueSerde.configure(
				Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl),
				false);
		valueSerde.configure(getSerdeConfig(), false);
		return valueSerde;
	}

	protected GenericAvroSerde getGenericAvroSerde() {

		final GenericAvroSerde valueSerde = new GenericAvroSerde(schemaRegistryClient);
		valueSerde.configure(getSerdeConfig(), false);
		return valueSerde;
	}

	private Map<String, String> getSerdeConfig() {
		return Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
	}
}