Commit 5d43a7ad authored by Noel Alonso's avatar Noel Alonso
Browse files

Filtra valores nulos y añade auto.offset.reset

parent 1a575af9
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -7,6 +7,7 @@ services:
      APP_ID:
      BOOTSTRAP_SERVERS:
      SCHEMA_REGISTRY:
      AUTO_OFFSET_RESET:
      AREAS_TOPIC:
      POINTS_TOPIC:
      RESULT_TOPIC:
+7 −4
Original line number Diff line number Diff line
@@ -86,7 +86,8 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase
				.reduce((aggValue, newValue) -> newValue, Materialized.with(null, getGenericAvroSerde())).toStream();

		KStream<String, GenericRecord> areasKStreamEnriched = lastAreasKStream
				.flatMapValues((value) -> enrichAreaWithGeoHash(value)).selectKey((key, value) -> getGeoHashKey(value));
				.flatMapValues((value) -> enrichAreaWithGeoHash(value)).filter((k, v) -> (v != null))
				.selectKey((key, value) -> getGeoHashKey(value));

		KTable<String, HashMap<String, GenericRecord>> areasKTableAgg = areasKStreamEnriched
				.groupByKey(Serialized.with(null, getGenericAvroSerde())).aggregate(HashMap<String, GenericRecord>::new,
@@ -95,7 +96,8 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase
						Materialized.with(null, hashMapSerde));

		KStream<String, GenericRecord> pointsStreamEnriched = pointsStream
				.mapValues(value -> enrichPointWithGeoHash(value)).selectKey((k, v) -> getGeoHashKey(v));
				.mapValues(value -> enrichPointWithGeoHash(value)).filter((k, v) -> (v != null))
				.selectKey((k, v) -> getGeoHashKey(v));

		pointsStreamEnriched
				.join(areasKTableAgg, (point, areas) -> getPointInAreaAlert(point, areas),
@@ -289,7 +291,8 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase
    			resultTopic = (String) env.get(RESULT_TOPIC),
				appId = (String) env.get(APP_ID),
				bootstrapServers = (String) env.get(BOOTSTRAP_SERVERS),
				schemaRegistryUrl = (String) env.get(SCHEMA_REGISTRY);
				schemaRegistryUrl = (String) env.get(SCHEMA_REGISTRY),
				autoOffsetReset = (String) env.get(AUTO_OFFSET_RESET);
    	// @formatter:on

		System.out.format("Load config...%n");
@@ -306,7 +309,7 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase

		Topology topology = app.getTopology(pointsTopic, areasTopic, resultTopic);

		Properties props = app.getKafkaProperties(appId, bootstrapServers);
		Properties props = app.getKafkaProperties(appId, bootstrapServers, autoOffsetReset);

		app.startStreams(topology, props);
	}
+5 −3
Original line number Diff line number Diff line
@@ -32,7 +32,8 @@ public abstract class StreamsApplicationBase {

	protected static final String APP_ID = "APP_ID",
			BOOTSTRAP_SERVERS = "BOOTSTRAP_SERVERS",
			SCHEMA_REGISTRY = "SCHEMA_REGISTRY";
			SCHEMA_REGISTRY = "SCHEMA_REGISTRY",
			AUTO_OFFSET_RESET = "AUTO_OFFSET_RESET";
	// @formatter:on

	@SuppressWarnings("serial")
@@ -41,6 +42,7 @@ public abstract class StreamsApplicationBase {
			put(APP_ID, "Stream application identifier");
			put(BOOTSTRAP_SERVERS, "Kafka servers");
			put(SCHEMA_REGISTRY, "Schema registry server");
			put(AUTO_OFFSET_RESET, "auto.offset.reset consumer property");
		}
	};

@@ -56,10 +58,10 @@ public abstract class StreamsApplicationBase {
		addShutdownHookAndBlock(streams);
	}

	public Properties getKafkaProperties(String appId, String bootstrapServers) {
	public Properties getKafkaProperties(String appId, String bootstrapServers, String autoOffsetReset) {

		// Sobrescribir método o añadir aquí properties específicas si fuera necesario
		return StreamsApplicationUtils.getStreamConfig(appId, bootstrapServers, schemaRegistryUrl);
		return StreamsApplicationUtils.getStreamConfig(appId, bootstrapServers, schemaRegistryUrl, autoOffsetReset);
	}

	protected void addShutdownHookAndBlock(KafkaStreams streams) {
+5 −3
Original line number Diff line number Diff line
@@ -16,11 +16,13 @@ public class StreamsApplicationUtils {
	// @formatter:off

	protected final static String SCHEMA_REGISTRY_URL_PROPERTY = "schema.registry.url",
			SCHEMA_REGISTRY_VALUE_SUBJECT_NAME_STRATEGY = "value.subject.name.strategy";
			SCHEMA_REGISTRY_VALUE_SUBJECT_NAME_STRATEGY = "value.subject.name.strategy",
			AUTO_OFFSET_RESET = "auto.offset.reset";

	// @formatter:on

	public static Properties getStreamConfig(String appId, String bootstrapServers, String schemaRegistry) {
	public static Properties getStreamConfig(String appId, String bootstrapServers, String schemaRegistry,
			String autoOffsetReset) {

		Properties config = new Properties();

@@ -28,7 +30,7 @@ public class StreamsApplicationUtils {
		config.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
		config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		// config.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);

		config.put(AUTO_OFFSET_RESET, autoOffsetReset);
		config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1); // commit as fast as possible

		config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
+3 −2
Original line number Diff line number Diff line
@@ -38,7 +38,8 @@ public class VesselRestrictionCheckerApplicationTest {
	private static final String POINT_TOPIC = "pointsTopic",
			AREAS_TOPIC = "areasTopic",
			RESULT_TOPIC = "resultTopic",
			SCHEMA_REGISTRY_URL = "http://dummy";
			SCHEMA_REGISTRY_URL = "http://dummy",
			AUTO_OFFSET_RESET = "earliest";

	// @formatter:on

@@ -71,7 +72,7 @@ public class VesselRestrictionCheckerApplicationTest {

		Topology topology = app.getTopology(POINT_TOPIC, AREAS_TOPIC, RESULT_TOPIC);

		Properties props = app.getKafkaProperties("appId", "localhost:9092");
		Properties props = app.getKafkaProperties("appId", "localhost:9092", AUTO_OFFSET_RESET);

		testDriver = new TopologyTestDriver(topology, props);