Commit 3b47a9de authored by Noel Alonso's avatar Noel Alonso
Browse files

Setea JoinWindows desde propiedad de la config

De este modo se podrá configurar esta propiedad desde las properties de
Spring
parent 51533c2f
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -56,6 +56,9 @@ public class VesselCommandHandler extends CommandHandler {
	@Value("${broker.topic.vessel-type}")
	private String vessel_type_topic;

	@Value("${stream.windows.time.ms}")
	private Long streamWindowsTime;

	private VesselStateStore vesselStateStore;

	@Autowired
@@ -91,6 +94,7 @@ public class VesselCommandHandler extends CommandHandler {
					.serviceId(vessels_events_stream_id)
					.stateStoreDir(state_store_vessels_dir)
					.topic(vessel_topic)
					.windowsTime(streamWindowsTime)
					.build(), vessel_type_topic, alertService);
		// @formatter:on
	}
+4 −0
Original line number Diff line number Diff line
@@ -53,6 +53,9 @@ public class VesselTypeCommandHandler extends CommandHandler {
	@Value("${broker.stream.events.vesseltypes.id}")
	private String vesseltypes_events_stream_id;

	@Value("${stream.windows.time.ms}")
	private Long streamWindowsTime;

	private VesselTypeStateStore vesselTypeStateStore;

	@Autowired
@@ -85,6 +88,7 @@ public class VesselTypeCommandHandler extends CommandHandler {
				.serviceId(vesseltypes_events_stream_id)
				.stateStoreDir(state_store_vesseltypes_dir)
				.topic(vessel_type_topic)
				.windowsTime(streamWindowsTime)
				.build(), alertService);
		
		// @formatter:on
+2 −3
Original line number Diff line number Diff line
package es.redmic.vesselscommands.streams;

import java.util.ArrayList;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.JoinWindows;
@@ -56,7 +55,7 @@ public class VesselEventStreams extends EventStreams {
		// Join por id, mandando a kafka el evento de éxito
		createConfirmedEvents.join(createRequestEvents,
				(confirmedEvent, requestEvent) -> getCreatedEvent(confirmedEvent, requestEvent),
				JoinWindows.of(TimeUnit.SECONDS.toMillis(60))).to(topic);
				JoinWindows.of(windowsTime)).to(topic);
	}

	private Event getCreatedEvent(Event confirmedEvent, Event requestEvent) {
@@ -105,7 +104,7 @@ public class VesselEventStreams extends EventStreams {
		// Join por id, mandando a kafka el evento de éxito
		updateConfirmedEvents.join(updateRequestEvents,
				(confirmedEvent, requestEvent) -> getUpdatedEvent(confirmedEvent, requestEvent),
				JoinWindows.of(TimeUnit.SECONDS.toMillis(60))).to(topic);
				JoinWindows.of(windowsTime)).to(topic);
	}

	private Event getUpdatedEvent(Event confirmedEvent, Event requestEvent) {
+2 −4
Original line number Diff line number Diff line
package es.redmic.vesselscommands.streams;

import java.util.concurrent.TimeUnit;

import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
@@ -44,7 +42,7 @@ public class VesselTypeEventStreams extends EventStreams {
		// Join por id, mandando a kafka el evento de éxito
		createConfirmedEvents.join(createRequestEvents,
				(confirmedEvent, requestEvent) -> getCreatedEvent(confirmedEvent, requestEvent),
				JoinWindows.of(TimeUnit.SECONDS.toMillis(60))).to(topic);
				JoinWindows.of(windowsTime)).to(topic);
	}

	private Event getCreatedEvent(Event confirmedEvent, Event requestEvent) {
@@ -93,7 +91,7 @@ public class VesselTypeEventStreams extends EventStreams {
		// Join por id, mandando a kafka el evento de éxito
		updateConfirmedEvents.join(updateRequestEvents,
				(confirmedEvent, requestEvent) -> getUpdatedEvent(confirmedEvent, requestEvent),
				JoinWindows.of(TimeUnit.SECONDS.toMillis(60))).to(topic);
				JoinWindows.of(windowsTime)).to(topic);
	}

	private Event getUpdatedEvent(Event confirmedEvent, Event requestEvent) {
+1 −1
Original line number Diff line number Diff line
@@ -37,7 +37,7 @@ spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvr
#spring.kafka.properties.producer.request.timeout.ms=

eventsource.timeout.ms=120000

stream.windows.time.ms=120000

spring.kafka.properties.specific.avro.reader=true