Commit 8aa42ce9 authored by Noel Alonso's avatar Noel Alonso
Browse files

Merge branch 'master' into dev

parents 60197eeb 9e5898e1
Loading
Loading
Loading
Loading
+20 −0
Original line number Diff line number Diff line
@@ -14,16 +14,21 @@ public class StreamConfig {

	private String serviceId;

	private Long windowsTime;

	public StreamConfig(Builder builder) {
		this.topic = builder.topic;
		this.stateStoreDir = builder.stateStoreDir;
		this.serviceId = builder.serviceId;
		this.bootstrapServers = builder.bootstrapServers;
		this.schemaRegistry = builder.schemaRegistry;
		this.windowsTime = builder.windowsTime;
	}

	public static class Builder {

		private static final long DEFAULT_WINDOWS_TIME_MS = 60000;

		private String bootstrapServers;

		private String schemaRegistry;
@@ -34,6 +39,8 @@ public class StreamConfig {

		private String serviceId;

		private Long windowsTime;

		public static Builder bootstrapServers(String bootstrapServers) {
			Builder builder = new Builder();
			builder.bootstrapServers = bootstrapServers;
@@ -60,6 +67,11 @@ public class StreamConfig {
			return this;
		}

		public Builder windowsTime(long windowsTime) {
			this.windowsTime = windowsTime;
			return this;
		}

		public StreamConfig build() {

			assertNotNull(bootstrapServers);
@@ -67,6 +79,10 @@ public class StreamConfig {
			assertNotNull(serviceId);
			assertNotNull(stateStoreDir);
			assertNotNull(topic);

			if (windowsTime == null) {
				windowsTime = DEFAULT_WINDOWS_TIME_MS;
			}
			return new StreamConfig(this);
		}
	}
@@ -90,4 +106,8 @@ public class StreamConfig {
	public String getServiceId() {
		return serviceId;
	}

	public long getWindowsTime() {
		return windowsTime;
	}
}
+3 −0
Original line number Diff line number Diff line
@@ -20,6 +20,8 @@ public abstract class BaseStreams {

	protected String serviceId;

	protected Long windowsTime;

	protected final String SCHEMA_REGISTRY_URL_PROPERTY = "schema.registry.url";

	protected KafkaStreams streams;
@@ -30,6 +32,7 @@ public abstract class BaseStreams {
		this.serviceId = config.getServiceId();
		this.bootstrapServers = config.getBootstrapServers();
		this.schemaRegistry = config.getSchemaRegistry();
		this.windowsTime = config.getWindowsTime();
	}

	protected void init() {