Commit 7b699a54 authored by Noel Alonso's avatar Noel Alonso
Browse files

Serializa y deserializa la clave sin registry

parent c77dccdc
Loading
Loading
Loading
Loading
+2 −5
Original line number Diff line number Diff line
@@ -3,11 +3,11 @@ package es.redmic.brokerlib.avro.serde.hashmap;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;

@@ -15,15 +15,12 @@ public class HashMapDeserializer<K, V> implements Deserializer<HashMap<K, V>> {

	KafkaAvroDeserializer deserializer;

	StringDeserializer stringDeserializer;

	// Default constructor needed by Kafka
	public HashMapDeserializer() {
	}

	public HashMapDeserializer(KafkaAvroDeserializer deserializer) {
		this.deserializer = deserializer;
		this.stringDeserializer = new StringDeserializer();
	}

	@Override
@@ -52,7 +49,7 @@ public class HashMapDeserializer<K, V> implements Deserializer<HashMap<K, V>> {
				final byte[] valueBytes = new byte[dataInputStream.readInt()];
				dataInputStream.read(valueBytes);

				hashMap.put((K) stringDeserializer.deserialize(topic, keyBytes),
				hashMap.put((K) new String(keyBytes, StandardCharsets.UTF_8),
						(V) deserializer.deserialize(topic, valueBytes));
			}
		} catch (IOException e) {
+0 −2
Original line number Diff line number Diff line
@@ -14,7 +14,6 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;

public class HashMapSerde<K, V> implements Serde<HashMap<K, V>> {

@@ -26,7 +25,6 @@ public class HashMapSerde<K, V> implements Serde<HashMap<K, V>> {
		Properties defaultConfig = new Properties();
		defaultConfig.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
		defaultConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
		defaultConfig.put(KafkaAvroDeserializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class);

		SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000);

+1 −5
Original line number Diff line number Diff line
@@ -7,7 +7,6 @@ import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;

import io.confluent.kafka.serializers.KafkaAvroSerializer;

@@ -15,15 +14,12 @@ public class HashMapSerializer<K, V> implements Serializer<HashMap<K, V>> {

	KafkaAvroSerializer serializer;

	StringSerializer stringSerializer;

	// Default constructor needed by Kafka
	public HashMapSerializer() {
	}

	public HashMapSerializer(KafkaAvroSerializer serializer) {
		this.serializer = serializer;
		this.stringSerializer = new StringSerializer();
	}

	@Override
@@ -41,7 +37,7 @@ public class HashMapSerializer<K, V> implements Serializer<HashMap<K, V>> {
			dos.writeInt(size);
			for (Map.Entry<K, V> entry : queue.entrySet()) {

				final byte[] bytesKey = stringSerializer.serialize(topic, entry.getKey().toString());
				final byte[] bytesKey = entry.getKey().toString().getBytes();
				dos.writeInt(bytesKey.length);
				dos.write(bytesKey);