Loading src/main/java/es/redmic/brokerlib/avro/serde/hashmap/HashMapDeserializer.java +5 −1 Original line number Diff line number Diff line Loading @@ -7,6 +7,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializer; Loading @@ -14,12 +15,15 @@ public class HashMapDeserializer<K, V> implements Deserializer<HashMap<K, V>> { KafkaAvroDeserializer deserializer; StringDeserializer stringDeserializer; // Default constructor needed by Kafka public HashMapDeserializer() { } public HashMapDeserializer(KafkaAvroDeserializer deserializer) { this.deserializer = deserializer; this.stringDeserializer = new StringDeserializer(); } @Override Loading Loading @@ -48,7 +52,7 @@ public class HashMapDeserializer<K, V> implements Deserializer<HashMap<K, V>> { final byte[] valueBytes = new byte[dataInputStream.readInt()]; dataInputStream.read(valueBytes); hashMap.put((K) deserializer.deserialize(topic, keyBytes), hashMap.put((K) stringDeserializer.deserialize(topic, keyBytes), (V) deserializer.deserialize(topic, valueBytes)); } } catch (IOException e) { Loading src/main/java/es/redmic/brokerlib/avro/serde/hashmap/HashMapSerde.java +2 −0 Original line number Diff line number Diff line Loading @@ -14,6 +14,7 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy; public class HashMapSerde<K, V> implements Serde<HashMap<K, V>> { Loading @@ -25,6 +26,7 @@ public class HashMapSerde<K, V> implements Serde<HashMap<K, V>> { Properties defaultConfig = new Properties(); defaultConfig.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); defaultConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true"); defaultConfig.put(KafkaAvroDeserializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class); SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000); Loading src/main/java/es/redmic/brokerlib/avro/serde/hashmap/HashMapSerializer.java +5 −1 Original line number Diff line number Diff line Loading @@ -7,6 +7,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; Loading @@ -14,12 +15,15 @@ public class HashMapSerializer<K, V> implements Serializer<HashMap<K, V>> { KafkaAvroSerializer serializer; StringSerializer stringSerializer; // Default constructor needed by Kafka public HashMapSerializer() { } public HashMapSerializer(KafkaAvroSerializer serializer) { this.serializer = serializer; this.stringSerializer = new StringSerializer(); } @Override Loading @@ -37,7 +41,7 @@ public class HashMapSerializer<K, V> implements Serializer<HashMap<K, V>> { dos.writeInt(size); for (Map.Entry<K, V> entry : queue.entrySet()) { final byte[] bytesKey = serializer.serialize(topic, entry.getKey()); final byte[] bytesKey = stringSerializer.serialize(topic, entry.getKey().toString()); dos.writeInt(bytesKey.length); dos.write(bytesKey); Loading Loading
src/main/java/es/redmic/brokerlib/avro/serde/hashmap/HashMapDeserializer.java +5 −1 Original line number Diff line number Diff line Loading @@ -7,6 +7,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializer; Loading @@ -14,12 +15,15 @@ public class HashMapDeserializer<K, V> implements Deserializer<HashMap<K, V>> { KafkaAvroDeserializer deserializer; StringDeserializer stringDeserializer; // Default constructor needed by Kafka public HashMapDeserializer() { } public HashMapDeserializer(KafkaAvroDeserializer deserializer) { this.deserializer = deserializer; this.stringDeserializer = new StringDeserializer(); } @Override Loading Loading @@ -48,7 +52,7 @@ public class HashMapDeserializer<K, V> implements Deserializer<HashMap<K, V>> { final byte[] valueBytes = new byte[dataInputStream.readInt()]; dataInputStream.read(valueBytes); hashMap.put((K) deserializer.deserialize(topic, keyBytes), hashMap.put((K) stringDeserializer.deserialize(topic, keyBytes), (V) deserializer.deserialize(topic, valueBytes)); } } catch (IOException e) { Loading
src/main/java/es/redmic/brokerlib/avro/serde/hashmap/HashMapSerde.java +2 −0 Original line number Diff line number Diff line Loading @@ -14,6 +14,7 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy; public class HashMapSerde<K, V> implements Serde<HashMap<K, V>> { Loading @@ -25,6 +26,7 @@ public class HashMapSerde<K, V> implements Serde<HashMap<K, V>> { Properties defaultConfig = new Properties(); defaultConfig.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); defaultConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true"); defaultConfig.put(KafkaAvroDeserializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class); SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000); Loading
src/main/java/es/redmic/brokerlib/avro/serde/hashmap/HashMapSerializer.java +5 −1 Original line number Diff line number Diff line Loading @@ -7,6 +7,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; Loading @@ -14,12 +15,15 @@ public class HashMapSerializer<K, V> implements Serializer<HashMap<K, V>> { KafkaAvroSerializer serializer; StringSerializer stringSerializer; // Default constructor needed by Kafka public HashMapSerializer() { } public HashMapSerializer(KafkaAvroSerializer serializer) { this.serializer = serializer; this.stringSerializer = new StringSerializer(); } @Override Loading @@ -37,7 +41,7 @@ public class HashMapSerializer<K, V> implements Serializer<HashMap<K, V>> { dos.writeInt(size); for (Map.Entry<K, V> entry : queue.entrySet()) { final byte[] bytesKey = serializer.serialize(topic, entry.getKey()); final byte[] bytesKey = stringSerializer.serialize(topic, entry.getKey().toString()); dos.writeInt(bytesKey.length); dos.write(bytesKey); Loading