Commit 9e5898e1 authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade propiedad a configuración de streams

Con la propiedad windowsTime se pretende configurar el tamaño de la ventana
en milisegundos al hacer join de dos eventos. De este modo se podrá configurar
esta propiedad desde las properties de spring
parent 7dc2fefb
Loading
Loading
Loading
Loading
+20 −0
Original line number Diff line number Diff line
@@ -14,16 +14,21 @@ public class StreamConfig {

	private String serviceId;

	private Long windowsTime;

	public StreamConfig(Builder builder) {
		this.topic = builder.topic;
		this.stateStoreDir = builder.stateStoreDir;
		this.serviceId = builder.serviceId;
		this.bootstrapServers = builder.bootstrapServers;
		this.schemaRegistry = builder.schemaRegistry;
		this.windowsTime = builder.windowsTime;
	}

	public static class Builder {

		private static final long DEFAULT_WINDOWS_TIME_MS = 60000;

		private String bootstrapServers;

		private String schemaRegistry;
@@ -34,6 +39,8 @@ public class StreamConfig {

		private String serviceId;

		private Long windowsTime;

		public static Builder bootstrapServers(String bootstrapServers) {
			Builder builder = new Builder();
			builder.bootstrapServers = bootstrapServers;
@@ -60,6 +67,11 @@ public class StreamConfig {
			return this;
		}

		public Builder windowsTime(long windowsTime) {
			this.windowsTime = windowsTime;
			return this;
		}

		public StreamConfig build() {

			assertNotNull(bootstrapServers);
@@ -67,6 +79,10 @@ public class StreamConfig {
			assertNotNull(serviceId);
			assertNotNull(stateStoreDir);
			assertNotNull(topic);

			if (windowsTime == null) {
				windowsTime = DEFAULT_WINDOWS_TIME_MS;
			}
			return new StreamConfig(this);
		}
	}
@@ -90,4 +106,8 @@ public class StreamConfig {
	public String getServiceId() {
		return serviceId;
	}

	public long getWindowsTime() {
		return windowsTime;
	}
}
+3 −0
Original line number Diff line number Diff line
@@ -20,6 +20,8 @@ public abstract class BaseStreams {

	protected String serviceId;

	protected Long windowsTime;

	protected final String SCHEMA_REGISTRY_URL_PROPERTY = "schema.registry.url";

	protected KafkaStreams streams;
@@ -30,6 +32,7 @@ public abstract class BaseStreams {
		this.serviceId = config.getServiceId();
		this.bootstrapServers = config.getBootstrapServers();
		this.schemaRegistry = config.getSchemaRegistry();
		this.windowsTime = config.getWindowsTime();
	}

	protected void init() {