Commit 6ad6b8a5 authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade HashMapSerde para agregar áreas por hashcode

parent e1be3298
Loading
Loading
Loading
Loading
+66 −0
Original line number Diff line number Diff line
package es.redmic.vesselrestrictionchecker.avro.hashmapserde;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;

public class HashMapDeserializer<K, V> implements Deserializer<HashMap<K, V>> {

	KafkaAvroDeserializer deserializer;

	// Default constructor needed by Kafka
	public HashMapDeserializer() {
	}

	public HashMapDeserializer(KafkaAvroDeserializer deserializer) {
		this.deserializer = deserializer;
	}

	@Override
	public void configure(Map<String, ?> configs, boolean isKey) {
		this.deserializer.configure(configs, isKey);
	}

	@SuppressWarnings("unchecked")
	@Override
	public HashMap<K, V> deserialize(String topic, byte[] bytes) {

		if (bytes == null || bytes.length == 0) {
			return null;
		}

		final HashMap<K, V> hashMap = new HashMap<K, V>();
		final DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes));

		try {
			final int records = dataInputStream.readInt();
			for (int i = 0; i < records; i++) {

				final byte[] keyBytes = new byte[dataInputStream.readInt()];
				dataInputStream.read(keyBytes);

				final byte[] valueBytes = new byte[dataInputStream.readInt()];
				dataInputStream.read(valueBytes);

				hashMap.put((K) new String(keyBytes, StandardCharsets.UTF_8),
						(V) deserializer.deserialize(topic, valueBytes));
			}
		} catch (IOException e) {
			throw new RuntimeException("Unable to deserialize ArrayList", e);
		}

		return hashMap;
	}

	@Override
	public void close() {
		this.deserializer.close();
	}
}
+59 −0
Original line number Diff line number Diff line
package es.redmic.vesselrestrictionchecker.avro.hashmapserde;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;

public class HashMapSerde<K, V> implements Serde<HashMap<K, V>> {

	private final Serde<HashMap<K, V>> inner;

	@SuppressWarnings({ "rawtypes", "unchecked" })
	public HashMapSerde(String schemaRegistryUrl) {

		Properties defaultConfig = new Properties();
		defaultConfig.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
		defaultConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");

		SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000);

		KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer(schemaRegistry, new HashMap(defaultConfig));

		KafkaAvroSerializer serializer = new KafkaAvroSerializer(schemaRegistry, new HashMap(defaultConfig));

		inner = Serdes.serdeFrom(new HashMapSerializer<>(serializer), new HashMapDeserializer<>(deserializer));
	}

	@Override
	public Serializer<HashMap<K, V>> serializer() {
		return inner.serializer();
	}

	@Override
	public Deserializer<HashMap<K, V>> deserializer() {
		return inner.deserializer();
	}

	@Override
	public void configure(Map<String, ?> configs, boolean isKey) {
		inner.serializer().configure(configs, isKey);
		inner.deserializer().configure(configs, isKey);
	}

	@Override
	public void close() {
		inner.serializer().close();
		inner.deserializer().close();
	}
}
+58 −0
Original line number Diff line number Diff line
package es.redmic.vesselrestrictionchecker.avro.hashmapserde;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.serialization.Serializer;

import io.confluent.kafka.serializers.KafkaAvroSerializer;

public class HashMapSerializer<K, V> implements Serializer<HashMap<K, V>> {

	KafkaAvroSerializer serializer;

	// Default constructor needed by Kafka
	public HashMapSerializer() {
	}

	public HashMapSerializer(KafkaAvroSerializer serializer) {
		this.serializer = serializer;
	}

	@Override
	public void configure(Map<String, ?> configs, boolean isKey) {
		this.serializer.configure(configs, isKey);
	}

	@Override
	public byte[] serialize(String topic, HashMap<K, V> queue) {

		final int size = queue.size();
		final ByteArrayOutputStream baos = new ByteArrayOutputStream();
		final DataOutputStream dos = new DataOutputStream(baos);
		try {
			dos.writeInt(size);
			for (Map.Entry<K, V> entry : queue.entrySet()) {

				final byte[] bytesKey = entry.getKey().toString().getBytes();
				dos.writeInt(bytesKey.length);
				dos.write(bytesKey);

				final byte[] bytesValue = serializer.serialize(topic, entry.getValue());
				dos.writeInt(bytesValue.length);
				dos.write(bytesValue);
			}
		} catch (IOException e) {
			throw new RuntimeException("Unable to serialize ArrayList", e);
		}
		return baos.toByteArray();
	}

	@Override
	public void close() {
		this.serializer.close();
	}
}