Commit fbf318e2 authored by Noel Alonso's avatar Noel Alonso
Browse files

Cambia constructor para pasar SchemaRegistryClient

Además añade config
parent e0616d96
Loading
Loading
Loading
Loading
+7 −4
Original line number Diff line number Diff line
package es.redmic.vesselrestrictionchecker.avro.hashmapserde;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -9,8 +10,8 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
@@ -20,19 +21,21 @@ public class HashMapSerde<K, V> implements Serde<HashMap<K, V>> {
	private final Serde<HashMap<K, V>> inner;

	@SuppressWarnings({ "rawtypes", "unchecked" })
	public HashMapSerde(String schemaRegistryUrl) {
	public HashMapSerde(SchemaRegistryClient schemaRegistry, String schemaRegistryUrl) {

		Properties defaultConfig = new Properties();
		defaultConfig.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
		defaultConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");

		SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000);

		KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer(schemaRegistry, new HashMap(defaultConfig));

		KafkaAvroSerializer serializer = new KafkaAvroSerializer(schemaRegistry, new HashMap(defaultConfig));

		inner = Serdes.serdeFrom(new HashMapSerializer<>(serializer), new HashMapDeserializer<>(deserializer));

		inner.configure(
				Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl),
				false);
	}

	@Override