Loading src/main/java/es/redmic/vesselrestrictionchecker/utils/StreamsApplicationUtils.java +12 −9 Original line number Diff line number Diff line package es.redmic.vesselrestrictionchecker.utils; import java.util.Properties; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.processor.WallclockTimestampExtractor; import es.redmic.vesselrestrictionchecker.common.CustomRocksDBConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; public class StreamsApplicationUtils { Loading @@ -32,10 +34,11 @@ public class StreamsApplicationUtils { config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName()); config.put(SCHEMA_REGISTRY_URL_PROPERTY, schemaRegistry); config.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); config.put(SCHEMA_REGISTRY_VALUE_SUBJECT_NAME_STRATEGY, "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"); Loading Loading
src/main/java/es/redmic/vesselrestrictionchecker/utils/StreamsApplicationUtils.java +12 −9 Original line number Diff line number Diff line package es.redmic.vesselrestrictionchecker.utils; import java.util.Properties; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.processor.WallclockTimestampExtractor; import es.redmic.vesselrestrictionchecker.common.CustomRocksDBConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; public class StreamsApplicationUtils { Loading @@ -32,10 +34,11 @@ public class StreamsApplicationUtils { config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName()); config.put(SCHEMA_REGISTRY_URL_PROPERTY, schemaRegistry); config.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); config.put(SCHEMA_REGISTRY_VALUE_SUBJECT_NAME_STRATEGY, "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"); Loading