Loading pom.xml +0 −7 Original line number Diff line number Diff line Loading @@ -27,8 +27,6 @@ <licenseName>apache_v2</licenseName> <organizationName>REDMIC Project / Server</organizationName> <commons-cli.version>1.4</commons-cli.version> <kafka.version>2.0.1</kafka.version> <avro.version>1.8.2</avro.version> <confluent.version>5.3.0</confluent.version> Loading @@ -45,11 +43,6 @@ <inceptionYear>2019</inceptionYear> <dependencies> <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>${commons-cli.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> Loading src/main/java/es/redmic/vesselrestrictionchecker/VesselRestrictionCheckerApplication.java +43 −42 Original line number Diff line number Diff line package es.redmic.vesselrestrictionchecker; import java.util.ArrayList; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.kafka.streams.kstream.KStream; import java.util.HashMap; import java.util.Map; import es.redmic.vesselrestrictionchecker.common.StreamsApplicationBase; import es.redmic.vesselrestrictionchecker.utils.StreamsApplicationUtils; public class VesselRestrictionCheckerApplication extends StreamsApplicationBase { // @formatter:off private static final String AREAS_TOPIC = "areasTopic", POINTS_TOPIC = "pointsTopic", RESULT_TOPIC = "resultTopic"; private static final String AREAS_TOPIC = "AREAS_TOPIC", POINTS_TOPIC = "POINTS_TOPIC", RESULT_TOPIC = "RESULT_TOPIC"; // @formatter:on @SuppressWarnings("serial") private static ArrayList<Option> commandLineOptions = new ArrayList<Option>() {{ addAll(commandLineBaseOptions); add(new Option(AREAS_TOPIC, true, "geofencing topic")); add(new Option(POINTS_TOPIC, true, "points to check topic")); add(new Option(RESULT_TOPIC, true, "result topic")); }}; /** * * @param args * * */ private static HashMap<String, Object> requiredVariables = new HashMap<String, Object>() { { putAll(requiredVariablesBase); put(AREAS_TOPIC, "Kafka topic for receiving geofencing areas"); put(POINTS_TOPIC, "Kafka topic for receiving points to check"); put(RESULT_TOPIC, "Kafka topic for sending checking result"); } }; public static void main(String[] args) { CommandLine commandLineArgs = StreamsApplicationUtils.getCommandLineArgs(args, commandLineOptions); Map<String, Object> env = getEnvVariables(requiredVariables); // @formatter:off String areasTopic = commandLineArgs.getOptionValue(AREAS_TOPIC), pointsTopic = commandLineArgs.getOptionValue(POINTS_TOPIC), resultTopic = commandLineArgs.getOptionValue(RESULT_TOPIC); String areasTopic = (String) env.get(AREAS_TOPIC), pointsTopic = (String) env.get(POINTS_TOPIC), resultTopic = (String) env.get(RESULT_TOPIC), appId = (String) env.get(APP_ID), bootstrapServers = (String) env.get(BOOTSTRAP_SERVERS), schemaRegistry = (String) env.get(SCHEMA_REGISTRY); // @formatter:on KStream<String, String> areasStream = builder.stream(areasTopic), pointsStream = builder.stream(pointsTopic); System.out.format("Load config...%n"); System.out.format("%s: %s%n", requiredVariables.get(AREAS_TOPIC), areasTopic); System.out.format("%s: %s%n", requiredVariables.get(POINTS_TOPIC), pointsTopic); System.out.format("%s: %s%n", requiredVariables.get(RESULT_TOPIC), resultTopic); /*-KStream<String, String> areasStream = builder.stream(areasTopic), pointsStream = builder.stream(pointsTopic);-*/ // TODO: crear stream startStream(commandLineArgs); startStream(appId, bootstrapServers, schemaRegistry); } } src/main/java/es/redmic/vesselrestrictionchecker/common/StreamsApplicationBase.java +52 −23 Original line number Diff line number Diff line package es.redmic.vesselrestrictionchecker.common; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; Loading @@ -11,19 +10,32 @@ import es.redmic.vesselrestrictionchecker.utils.StreamsApplicationUtils; public abstract class StreamsApplicationBase { // @formatter:off protected static final String APP_ID = "APP_ID", BOOTSTRAP_SERVERS = "BOOTSTRAP_SERVERS", SCHEMA_REGISTRY = "SCHEMA_REGISTRY"; // @formatter:on @SuppressWarnings("serial") protected static ArrayList<Option> commandLineBaseOptions = new ArrayList<Option>() {{ add(new Option("appId", true, "stream application identifier")); add(new Option("bootstrapServers", true, "kafka servers")); add(new Option("schemaRegistry", true, "schema registry server")); }}; protected static HashMap<String, Object> requiredVariablesBase = new HashMap<String, Object>() { { put(APP_ID, "Stream application identifier"); put(BOOTSTRAP_SERVERS, "Kafka servers"); put(SCHEMA_REGISTRY, "Schema registry server"); } }; protected static StreamsBuilder builder = new StreamsBuilder(); protected static void startStream(CommandLine cmd) { protected static void startStream(String appId, String bootstrapServers, String schemaRegistry) { System.out.format("Kafka streams starting...%n"); System.out.format("BootstrapServers: %s, SchemaRegistry: %s, AppId: %s%n", bootstrapServers, schemaRegistry, appId); KafkaStreams streams = new KafkaStreams(builder.build(), StreamsApplicationUtils.getStreamConfig(cmd)); StreamsApplicationUtils.getStreamConfig(appId, bootstrapServers, schemaRegistry)); streams.setUncaughtExceptionHandler( (Thread thread, Throwable throwable) -> uncaughtException(thread, throwable, streams)); Loading @@ -40,7 +52,7 @@ public abstract class StreamsApplicationBase { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { System.out.println("Parando stream por señal SIGTERM"); System.out.println("Stopping stream. SIGTERM signal"); streams.close(); } })); Loading @@ -48,10 +60,27 @@ public abstract class StreamsApplicationBase { protected static void uncaughtException(Thread thread, Throwable throwable, KafkaStreams streams) { String msg = "Error no conocido en kafka stream. El stream dejará de funcionar " + throwable.getLocalizedMessage(); System.out.println(msg); System.err.println("Error. The stream will stop working " + throwable.getLocalizedMessage()); throwable.printStackTrace(); streams.close(); } protected static Map<String, Object> getEnvVariables(HashMap<String, Object> variablesRequired) { Map<String, Object> envVariables = new HashMap<>(); for (String key : variablesRequired.keySet()) { String value = System.getenv(key); if (value == null) { System.err.println("Error=Enviroment variable " + key + " not assigned. Description: " + variablesRequired.get(key)); System.exit(1); } envVariables.put(key, value); } return envVariables; } } src/main/java/es/redmic/vesselrestrictionchecker/utils/StreamsApplicationUtils.java +1 −39 Original line number Diff line number Diff line package es.redmic.vesselrestrictionchecker.utils; import java.util.List; import java.util.Properties; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; Loading @@ -27,36 +18,7 @@ public class StreamsApplicationUtils { // @formatter:on public static CommandLine getCommandLineArgs(String[] args, List<Option> optionList) { Options options = new Options(); for (Option option : optionList) { option.setRequired(true); options.addOption(option); } CommandLineParser parser = new DefaultParser(); HelpFormatter formatter = new HelpFormatter(); CommandLine cmd = null; try { cmd = parser.parse(options, args); } catch (ParseException e) { System.out.println(e.getMessage()); formatter.printHelp("utility-name", options); System.exit(1); } return cmd; } public static Properties getStreamConfig(CommandLine cmd) { String appId = cmd.getOptionValue("appId"); String bootstrapServers = cmd.getOptionValue("bootstrapServers"); String schemaRegistry = cmd.getOptionValue("schemaRegistry"); public static Properties getStreamConfig(String appId, String bootstrapServers, String schemaRegistry) { Properties config = new Properties(); Loading Loading
pom.xml +0 −7 Original line number Diff line number Diff line Loading @@ -27,8 +27,6 @@ <licenseName>apache_v2</licenseName> <organizationName>REDMIC Project / Server</organizationName> <commons-cli.version>1.4</commons-cli.version> <kafka.version>2.0.1</kafka.version> <avro.version>1.8.2</avro.version> <confluent.version>5.3.0</confluent.version> Loading @@ -45,11 +43,6 @@ <inceptionYear>2019</inceptionYear> <dependencies> <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>${commons-cli.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> Loading
src/main/java/es/redmic/vesselrestrictionchecker/VesselRestrictionCheckerApplication.java +43 −42 Original line number Diff line number Diff line package es.redmic.vesselrestrictionchecker; import java.util.ArrayList; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.kafka.streams.kstream.KStream; import java.util.HashMap; import java.util.Map; import es.redmic.vesselrestrictionchecker.common.StreamsApplicationBase; import es.redmic.vesselrestrictionchecker.utils.StreamsApplicationUtils; public class VesselRestrictionCheckerApplication extends StreamsApplicationBase { // @formatter:off private static final String AREAS_TOPIC = "areasTopic", POINTS_TOPIC = "pointsTopic", RESULT_TOPIC = "resultTopic"; private static final String AREAS_TOPIC = "AREAS_TOPIC", POINTS_TOPIC = "POINTS_TOPIC", RESULT_TOPIC = "RESULT_TOPIC"; // @formatter:on @SuppressWarnings("serial") private static ArrayList<Option> commandLineOptions = new ArrayList<Option>() {{ addAll(commandLineBaseOptions); add(new Option(AREAS_TOPIC, true, "geofencing topic")); add(new Option(POINTS_TOPIC, true, "points to check topic")); add(new Option(RESULT_TOPIC, true, "result topic")); }}; /** * * @param args * * */ private static HashMap<String, Object> requiredVariables = new HashMap<String, Object>() { { putAll(requiredVariablesBase); put(AREAS_TOPIC, "Kafka topic for receiving geofencing areas"); put(POINTS_TOPIC, "Kafka topic for receiving points to check"); put(RESULT_TOPIC, "Kafka topic for sending checking result"); } }; public static void main(String[] args) { CommandLine commandLineArgs = StreamsApplicationUtils.getCommandLineArgs(args, commandLineOptions); Map<String, Object> env = getEnvVariables(requiredVariables); // @formatter:off String areasTopic = commandLineArgs.getOptionValue(AREAS_TOPIC), pointsTopic = commandLineArgs.getOptionValue(POINTS_TOPIC), resultTopic = commandLineArgs.getOptionValue(RESULT_TOPIC); String areasTopic = (String) env.get(AREAS_TOPIC), pointsTopic = (String) env.get(POINTS_TOPIC), resultTopic = (String) env.get(RESULT_TOPIC), appId = (String) env.get(APP_ID), bootstrapServers = (String) env.get(BOOTSTRAP_SERVERS), schemaRegistry = (String) env.get(SCHEMA_REGISTRY); // @formatter:on KStream<String, String> areasStream = builder.stream(areasTopic), pointsStream = builder.stream(pointsTopic); System.out.format("Load config...%n"); System.out.format("%s: %s%n", requiredVariables.get(AREAS_TOPIC), areasTopic); System.out.format("%s: %s%n", requiredVariables.get(POINTS_TOPIC), pointsTopic); System.out.format("%s: %s%n", requiredVariables.get(RESULT_TOPIC), resultTopic); /*-KStream<String, String> areasStream = builder.stream(areasTopic), pointsStream = builder.stream(pointsTopic);-*/ // TODO: crear stream startStream(commandLineArgs); startStream(appId, bootstrapServers, schemaRegistry); } }
src/main/java/es/redmic/vesselrestrictionchecker/common/StreamsApplicationBase.java +52 −23 Original line number Diff line number Diff line package es.redmic.vesselrestrictionchecker.common; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; Loading @@ -11,19 +10,32 @@ import es.redmic.vesselrestrictionchecker.utils.StreamsApplicationUtils; public abstract class StreamsApplicationBase { // @formatter:off protected static final String APP_ID = "APP_ID", BOOTSTRAP_SERVERS = "BOOTSTRAP_SERVERS", SCHEMA_REGISTRY = "SCHEMA_REGISTRY"; // @formatter:on @SuppressWarnings("serial") protected static ArrayList<Option> commandLineBaseOptions = new ArrayList<Option>() {{ add(new Option("appId", true, "stream application identifier")); add(new Option("bootstrapServers", true, "kafka servers")); add(new Option("schemaRegistry", true, "schema registry server")); }}; protected static HashMap<String, Object> requiredVariablesBase = new HashMap<String, Object>() { { put(APP_ID, "Stream application identifier"); put(BOOTSTRAP_SERVERS, "Kafka servers"); put(SCHEMA_REGISTRY, "Schema registry server"); } }; protected static StreamsBuilder builder = new StreamsBuilder(); protected static void startStream(CommandLine cmd) { protected static void startStream(String appId, String bootstrapServers, String schemaRegistry) { System.out.format("Kafka streams starting...%n"); System.out.format("BootstrapServers: %s, SchemaRegistry: %s, AppId: %s%n", bootstrapServers, schemaRegistry, appId); KafkaStreams streams = new KafkaStreams(builder.build(), StreamsApplicationUtils.getStreamConfig(cmd)); StreamsApplicationUtils.getStreamConfig(appId, bootstrapServers, schemaRegistry)); streams.setUncaughtExceptionHandler( (Thread thread, Throwable throwable) -> uncaughtException(thread, throwable, streams)); Loading @@ -40,7 +52,7 @@ public abstract class StreamsApplicationBase { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { System.out.println("Parando stream por señal SIGTERM"); System.out.println("Stopping stream. SIGTERM signal"); streams.close(); } })); Loading @@ -48,10 +60,27 @@ public abstract class StreamsApplicationBase { protected static void uncaughtException(Thread thread, Throwable throwable, KafkaStreams streams) { String msg = "Error no conocido en kafka stream. El stream dejará de funcionar " + throwable.getLocalizedMessage(); System.out.println(msg); System.err.println("Error. The stream will stop working " + throwable.getLocalizedMessage()); throwable.printStackTrace(); streams.close(); } protected static Map<String, Object> getEnvVariables(HashMap<String, Object> variablesRequired) { Map<String, Object> envVariables = new HashMap<>(); for (String key : variablesRequired.keySet()) { String value = System.getenv(key); if (value == null) { System.err.println("Error=Enviroment variable " + key + " not assigned. Description: " + variablesRequired.get(key)); System.exit(1); } envVariables.put(key, value); } return envVariables; } }
src/main/java/es/redmic/vesselrestrictionchecker/utils/StreamsApplicationUtils.java +1 −39 Original line number Diff line number Diff line package es.redmic.vesselrestrictionchecker.utils; import java.util.List; import java.util.Properties; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; Loading @@ -27,36 +18,7 @@ public class StreamsApplicationUtils { // @formatter:on public static CommandLine getCommandLineArgs(String[] args, List<Option> optionList) { Options options = new Options(); for (Option option : optionList) { option.setRequired(true); options.addOption(option); } CommandLineParser parser = new DefaultParser(); HelpFormatter formatter = new HelpFormatter(); CommandLine cmd = null; try { cmd = parser.parse(options, args); } catch (ParseException e) { System.out.println(e.getMessage()); formatter.printHelp("utility-name", options); System.exit(1); } return cmd; } public static Properties getStreamConfig(CommandLine cmd) { String appId = cmd.getOptionValue("appId"); String bootstrapServers = cmd.getOptionValue("bootstrapServers"); String schemaRegistry = cmd.getOptionValue("schemaRegistry"); public static Properties getStreamConfig(String appId, String bootstrapServers, String schemaRegistry) { Properties config = new Properties(); Loading