Commit 8502ca98 authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade utilidad para arrancar esquema registry

De esta manera se puede usar un esquema registry arrancado en localhost
para la ejecución de tests de integración
parent 16ced6f9
Loading
Loading
Loading
Loading
+40 −9
Original line number Diff line number Diff line
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

	<parent>
@@ -31,6 +32,9 @@
		<jackson-datatype-jts.version>2.4</jackson-datatype-jts.version>
		<ma.glasnost.orika.core>1.5.2</ma.glasnost.orika.core>
		<powermock.version>1.7.3</powermock.version>
		<confluent.version>4.1.0</confluent.version>
		<jersey-hk2.version>2.26</jersey-hk2.version>
		<jersey-guava.version>2.26-b03</jersey-guava.version>

		<!-- Environment variables -->
		<env.MAVEN_REPO_URL>https://artifactory.redmic.net/artifactory</env.MAVEN_REPO_URL>
@@ -95,6 +99,26 @@
			<version>${powermock.version}</version>
		</dependency>

		<!-- Schema Registry for testing -->

		<dependency>
			<groupId>io.confluent</groupId>
			<artifactId>kafka-schema-registry</artifactId>
			<version>${confluent.version}</version>
		</dependency>

		<dependency>
			<groupId>org.glassfish.jersey.inject</groupId>
			<artifactId>jersey-hk2</artifactId>
			<version>${jersey-hk2.version}</version>
		</dependency>

		<dependency>
			<groupId>org.glassfish.jersey.bundles.repackaged</groupId>
			<artifactId>jersey-guava</artifactId>
			<version>${jersey-guava.version}</version>
		</dependency>

		<!--test de documentación -->

		<dependency>
@@ -115,4 +139,11 @@
			<uniqueVersion>false</uniqueVersion>
		</snapshotRepository>
	</distributionManagement>
	<repositories>
		<repository>
			<id>confluent</id>
			<name>Confluent</name>
			<url>https://packages.confluent.io/maven/</url>
		</repository>
	</repositories>
</project>
 No newline at end of file
+97 −0
Original line number Diff line number Diff line
package es.redmic.testutils.schemaregistry;

import java.util.Properties;

import org.eclipse.jetty.server.Server;

import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryIdentity;

public class RestApp {

	public final Properties prop;
	public RestService restClient;
	public SchemaRegistryRestApplication restApp;
	public Server restServer;
	public String restConnect;

	public RestApp(int port, String url, String zkConnect, String bootstrapBrokers, String compatibilityType) {
		this(port, url, zkConnect, bootstrapBrokers, compatibilityType, null);
	}

	public RestApp(int port, String url, String zkConnect, String bootstrapBrokers, String compatibilityType,
			Properties schemaRegistryProps) {

		bootstrapBrokers = bootstrapBrokers.replaceAll("127", "PLAINTEXT://127");

		prop = new Properties();
		if (schemaRegistryProps != null) {
			prop.putAll(schemaRegistryProps);
		}
		prop.setProperty(SchemaRegistryConfig.PORT_CONFIG, ((Integer) port).toString());
		if (zkConnect != null) {
			prop.setProperty(SchemaRegistryConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect);
		}
		if (bootstrapBrokers != null) {
			prop.setProperty(SchemaRegistryConfig.KAFKASTORE_BOOTSTRAP_SERVERS_CONFIG, bootstrapBrokers);
		}
		prop.put(SchemaRegistryConfig.KAFKASTORE_TOPIC_REPLICATION_FACTOR_CONFIG, 1);
		prop.put(SchemaRegistryConfig.KAFKASTORE_TOPIC_CONFIG, SchemaRegistryConfig.DEFAULT_KAFKASTORE_TOPIC);
		prop.put(SchemaRegistryConfig.COMPATIBILITY_CONFIG, compatibilityType);
		prop.put(SchemaRegistryConfig.MASTER_ELIGIBILITY, true);
		prop.put(SchemaRegistryConfig.LISTENERS_CONFIG, url);
	}

	public void start() throws Exception {
		restApp = new SchemaRegistryRestApplication(prop);
		restServer = restApp.createServer();
		restServer.start();
		restConnect = restServer.getURI().toString();
		if (restConnect.endsWith("/"))
			restConnect = restConnect.substring(0, restConnect.length() - 1);
		restClient = new RestService(restConnect);
	}

	public void stop() throws Exception {
		restClient = null;
		if (restServer != null) {
			restServer.stop();
			restServer.join();
		}
	}

	/**
	 * This method must be called before calling {@code RestApp.start()} for the
	 * additional properties to take affect.
	 *
	 * @param props
	 *            the additional properties to set
	 */
	public void addConfigs(Properties props) {
		prop.putAll(props);
	}

	public boolean isMaster() {
		return restApp.schemaRegistry().isMaster();
	}

	public void setMaster(SchemaRegistryIdentity schemaRegistryIdentity) throws SchemaRegistryException {
		restApp.schemaRegistry().setMaster(schemaRegistryIdentity);
	}

	public SchemaRegistryIdentity myIdentity() {
		return restApp.schemaRegistry().myIdentity();
	}

	public SchemaRegistryIdentity masterIdentity() {
		return restApp.schemaRegistry().masterIdentity();
	}

	public SchemaRegistry schemaRegistry() {
		return restApp.schemaRegistry();
	}
}