Commit ef7aaf53 authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade propiedades de configuración de streams

parent 262d3226
Loading
Loading
Loading
Loading
+8 −0
Original line number Diff line number Diff line
@@ -6,6 +6,7 @@ import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.Options;
@@ -37,8 +38,15 @@ public class StreamUtils {

		// config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
		// StreamsConfig.EXACTLY_ONCE);

		config.put(StreamsConfig.RETRIES_CONFIG, 10);
		config.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 200);
		config.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 605000);
		config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1); // commit as fast as possible

		config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
				LogAndContinueExceptionHandler.class);

		config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
		config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
		config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());