Loading config/quickstart-elasticsearch.properties +1 −1 Original line number Diff line number Diff line Loading @@ -19,5 +19,5 @@ connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector tasks.max=1 topics=test-elasticsearch-sink key.ignore=true transport.addresses=localhost:9200 connection.url=localhost:9200 type.name=kafka-connect No newline at end of file src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java +5 −4 Original line number Diff line number Diff line Loading @@ -29,9 +29,9 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig { private static final String ELASTICSEARCH_GROUP = "Elasticsearch"; private static final String CONNECTOR_GROUP = "Connector"; public static final String HTTP_ADDRESSES_CONFIG = "http.addresses"; private static final String HTTP_ADDRESSES_DOC = "The list of addresses to connect to Elasticsearch."; private static final String HTTP_ADDRESSES_DISPLAY = "HTTP Addresses"; public static final String CONNECTION_URL_CONFIG = "connection.url"; private static final String CONNECTION_URL_DOC = "The URL to connect to Elasticsearch."; private static final String CONNECTION_URL_DISPLAY = "Connection URL"; public static final String TYPE_NAME_CONFIG = "type.name"; private static final String TYPE_NAME_DOC = "The type to use for each index."; Loading Loading @@ -119,7 +119,8 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig { public static ConfigDef baseConfigDef() { return new ConfigDef() .define(HTTP_ADDRESSES_CONFIG, Type.STRING, Importance.HIGH, HTTP_ADDRESSES_DOC, ELASTICSEARCH_GROUP, 1, Width.LONG, HTTP_ADDRESSES_DISPLAY) .define(CONNECTION_URL_CONFIG, Type.STRING, Importance.HIGH, CONNECTION_URL_DOC, ELASTICSEARCH_GROUP, 1, Width.LONG, CONNECTION_URL_DISPLAY) .define(TYPE_NAME_CONFIG, Type.STRING, Importance.HIGH, TYPE_NAME_DOC, ELASTICSEARCH_GROUP, 2, Width.SHORT, TYPE_NAME_DISPLAY) .define(KEY_IGNORE_CONFIG, Type.BOOLEAN, KEY_IGNORE_DEFAULT, Importance.HIGH, KEY_IGNORE_DOC, CONNECTOR_GROUP, 3, Width.SHORT, KEY_IGNORE_DISPLAY) .define(BATCH_SIZE_CONFIG, Type.INT, BATCH_SIZE_DEFAULT, Importance.MEDIUM, BATCH_SIZE_DOC, CONNECTOR_GROUP, 4, Width.SHORT, BATCH_SIZE_DISPLAY) Loading src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java +1 −1 Original line number Diff line number Diff line Loading @@ -90,7 +90,7 @@ public class ElasticsearchSinkTask extends SinkTask { if (client != null) { this.client = client; } else { String address = config.getString(ElasticsearchSinkConnectorConfig.HTTP_ADDRESSES_CONFIG); String address = config.getString(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG); JestClientFactory factory = new JestClientFactory(); factory.setHttpClientConfig(new HttpClientConfig.Builder(address).multiThreaded(true).build()); this.client = factory.getObject(); Loading src/main/java/io/confluent/connect/elasticsearch/internals/BulkProcessor.java +1 −2 Original line number Diff line number Diff line Loading @@ -190,7 +190,6 @@ public class BulkProcessor implements Runnable { if (batch.getAttempts() > maxRetry) { fail(batch, t); } else { log.info("Retry with exception:{}", t.getMessage()); requests.addFirst(batch); requests.notify(); } Loading Loading @@ -298,7 +297,7 @@ public class BulkProcessor implements Runnable { private final RecordBatch batch; BulkTask(RecordBatch batch) { log.debug("Batch size: {}", batch.size()); log.trace("Batch size: {}", batch.size()); this.batch = batch; } Loading src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTaskTest.java +1 −1 Original line number Diff line number Diff line Loading @@ -36,7 +36,7 @@ public class ElasticsearchSinkTaskTest extends ElasticsearchSinkTestBase { private Map<String, String> createProps() { Map<String, String> props = new HashMap<>(); props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, TYPE); props.put(ElasticsearchSinkConnectorConfig.HTTP_ADDRESSES_CONFIG, "localhost"); props.put(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "localhost"); props.put(ElasticsearchSinkConnectorConfig.KEY_IGNORE_CONFIG, "true"); return props; } Loading Loading
config/quickstart-elasticsearch.properties +1 −1 Original line number Diff line number Diff line Loading @@ -19,5 +19,5 @@ connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector tasks.max=1 topics=test-elasticsearch-sink key.ignore=true transport.addresses=localhost:9200 connection.url=localhost:9200 type.name=kafka-connect No newline at end of file
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java +5 −4 Original line number Diff line number Diff line Loading @@ -29,9 +29,9 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig { private static final String ELASTICSEARCH_GROUP = "Elasticsearch"; private static final String CONNECTOR_GROUP = "Connector"; public static final String HTTP_ADDRESSES_CONFIG = "http.addresses"; private static final String HTTP_ADDRESSES_DOC = "The list of addresses to connect to Elasticsearch."; private static final String HTTP_ADDRESSES_DISPLAY = "HTTP Addresses"; public static final String CONNECTION_URL_CONFIG = "connection.url"; private static final String CONNECTION_URL_DOC = "The URL to connect to Elasticsearch."; private static final String CONNECTION_URL_DISPLAY = "Connection URL"; public static final String TYPE_NAME_CONFIG = "type.name"; private static final String TYPE_NAME_DOC = "The type to use for each index."; Loading Loading @@ -119,7 +119,8 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig { public static ConfigDef baseConfigDef() { return new ConfigDef() .define(HTTP_ADDRESSES_CONFIG, Type.STRING, Importance.HIGH, HTTP_ADDRESSES_DOC, ELASTICSEARCH_GROUP, 1, Width.LONG, HTTP_ADDRESSES_DISPLAY) .define(CONNECTION_URL_CONFIG, Type.STRING, Importance.HIGH, CONNECTION_URL_DOC, ELASTICSEARCH_GROUP, 1, Width.LONG, CONNECTION_URL_DISPLAY) .define(TYPE_NAME_CONFIG, Type.STRING, Importance.HIGH, TYPE_NAME_DOC, ELASTICSEARCH_GROUP, 2, Width.SHORT, TYPE_NAME_DISPLAY) .define(KEY_IGNORE_CONFIG, Type.BOOLEAN, KEY_IGNORE_DEFAULT, Importance.HIGH, KEY_IGNORE_DOC, CONNECTOR_GROUP, 3, Width.SHORT, KEY_IGNORE_DISPLAY) .define(BATCH_SIZE_CONFIG, Type.INT, BATCH_SIZE_DEFAULT, Importance.MEDIUM, BATCH_SIZE_DOC, CONNECTOR_GROUP, 4, Width.SHORT, BATCH_SIZE_DISPLAY) Loading
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java +1 −1 Original line number Diff line number Diff line Loading @@ -90,7 +90,7 @@ public class ElasticsearchSinkTask extends SinkTask { if (client != null) { this.client = client; } else { String address = config.getString(ElasticsearchSinkConnectorConfig.HTTP_ADDRESSES_CONFIG); String address = config.getString(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG); JestClientFactory factory = new JestClientFactory(); factory.setHttpClientConfig(new HttpClientConfig.Builder(address).multiThreaded(true).build()); this.client = factory.getObject(); Loading
src/main/java/io/confluent/connect/elasticsearch/internals/BulkProcessor.java +1 −2 Original line number Diff line number Diff line Loading @@ -190,7 +190,6 @@ public class BulkProcessor implements Runnable { if (batch.getAttempts() > maxRetry) { fail(batch, t); } else { log.info("Retry with exception:{}", t.getMessage()); requests.addFirst(batch); requests.notify(); } Loading Loading @@ -298,7 +297,7 @@ public class BulkProcessor implements Runnable { private final RecordBatch batch; BulkTask(RecordBatch batch) { log.debug("Batch size: {}", batch.size()); log.trace("Batch size: {}", batch.size()); this.batch = batch; } Loading
src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTaskTest.java +1 −1 Original line number Diff line number Diff line Loading @@ -36,7 +36,7 @@ public class ElasticsearchSinkTaskTest extends ElasticsearchSinkTestBase { private Map<String, String> createProps() { Map<String, String> props = new HashMap<>(); props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, TYPE); props.put(ElasticsearchSinkConnectorConfig.HTTP_ADDRESSES_CONFIG, "localhost"); props.put(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "localhost"); props.put(ElasticsearchSinkConnectorConfig.KEY_IGNORE_CONFIG, "true"); return props; } Loading