Commit 8044a806 authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade utilidades para la ejecución de streams

parent 03257be5
Loading
Loading
Loading
Loading
+21 −0
Original line number Diff line number Diff line
package es.redmic.kafkastreams.vesselrestrictionchecker.common;

import java.util.Map;

import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.Options;

public class CustomRocksDBConfig implements RocksDBConfigSetter {

	@Override
	public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
		// Workaround: We must ensure that the parallelism is set to >= 2. There seems
		// to be a known
		// issue with RocksDB where explicitly setting the parallelism to 1 causes
		// issues (even though
		// 1 seems to be RocksDB's default for this configuration).
		int compactionParallelism = Math.max(Runtime.getRuntime().availableProcessors(), 2);
		// Set number of compaction threads (but not flush threads).
		options.setIncreaseParallelism(compactionParallelism);
	}
}
+57 −0
Original line number Diff line number Diff line
package es.redmic.kafkastreams.vesselrestrictionchecker.common;

import java.util.ArrayList;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;

import es.redmic.kafkastreams.vesselrestrictionchecker.utils.StreamsApplicationUtils;

public abstract class StreamsApplicationBase {
	
	@SuppressWarnings("serial")
	protected static ArrayList<Option> commandLineBaseOptions = new ArrayList<Option>() {{
	    add(new Option("appId", true, "stream application identifier"));
	    add(new Option("bootstrapServers", true, "kafka servers"));
	    add(new Option("schemaRegistry", true, "schema registry server"));
	}};
	
	protected static StreamsBuilder builder = new StreamsBuilder();
	
	protected static void startStream(CommandLine cmd) {
		
		KafkaStreams streams = new KafkaStreams(builder.build(),
        		StreamsApplicationUtils.getStreamConfig(cmd));
        
        streams.setUncaughtExceptionHandler(
				(Thread thread, Throwable throwable) -> uncaughtException(thread, throwable, streams));
        
        streams.start();

        addShutdownHookAndBlock(streams);
	}

	protected static void addShutdownHookAndBlock(KafkaStreams streams) {

		Thread.currentThread().setUncaughtExceptionHandler((t, e) -> uncaughtException(t, e, streams));

		Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println("Parando stream por señal SIGTERM");
				streams.close();
			}
		}));
	}

	protected static void uncaughtException(Thread thread, Throwable throwable, KafkaStreams streams) {

		String msg = "Error no conocido en kafka stream. El stream dejará de funcionar "
				+ throwable.getLocalizedMessage();
		System.out.println(msg);
		throwable.printStackTrace();
		streams.close();
	}
}