Commit 635fc77a authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade propiedad de schemaRegistry para streams

Con esta propiedad se consigue que se compruebe la compatibilidad de esquemas
por el nombre del registro dentro de cada topic en lugar de para todos los esquemas del topic
parent 8e7aadfd
Loading
Loading
Loading
Loading
+8 −1
Original line number Diff line number Diff line
@@ -14,7 +14,12 @@ import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;

public class StreamUtils {

	protected final static String SCHEMA_REGISTRY_URL_PROPERTY = "schema.registry.url";
	// @formatter:off

	protected final static String SCHEMA_REGISTRY_URL_PROPERTY = "schema.registry.url",
			SCHEMA_REGISTRY_VALUE_SUBJECT_NAME_STRATEGY = "value.subject.name.strategy";

	// @formatter:on

	public static Properties baseStreamsConfig(String bootstrapServers, String stateDir, String appId,
			String schemaRegistry) {
@@ -34,6 +39,8 @@ public class StreamUtils {
		config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
		config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
		config.put(SCHEMA_REGISTRY_URL_PROPERTY, schemaRegistry);
		config.put(SCHEMA_REGISTRY_VALUE_SUBJECT_NAME_STRATEGY,
				"io.confluent.kafka.serializers.subject.TopicRecordNameStrategy");

		return config;
	}