Commit c5f35307 authored by Shikhar Bhushan's avatar Shikhar Bhushan
Browse files

Allow for multiple Elasticsearch HTTP URLs

Fixes #46
parent aa03c8b1
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -5,9 +5,9 @@ Connector
^^^^^^^^^

``connection.url``
  Elasticsearch HTTP connection URL e.g. ``http://eshost:9200``.
  List of Elasticsearch HTTP connection URLs e.g. ``http://eshost1:9200,http://eshost2:9200``.

  * Type: string
  * Type: list
  * Importance: high

``batch.size``
+4 −4
Original line number Diff line number Diff line
@@ -48,9 +48,9 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
      final String group = "Connector";
      int order = 0;
      configDef
          .define(CONNECTION_URL_CONFIG, Type.STRING, Importance.HIGH,
                  "Elasticsearch HTTP connection URL e.g. ``http://eshost:9200``.",
                  group, ++order, Width.LONG, "Connection URL")
          .define(CONNECTION_URL_CONFIG, Type.LIST, Importance.HIGH,
                  "List of Elasticsearch HTTP connection URLs e.g. ``http://eshost1:9200,http://eshost2:9200``.",
                  group, ++order, Width.LONG, "Connection URLs")
          .define(BATCH_SIZE_CONFIG, Type.INT, 2000, Importance.MEDIUM,
                  "The number of records to process as a batch when writing to Elasticsearch.",
                  group, ++order, Width.SHORT, "Batch Size")
@@ -122,6 +122,6 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
  }

  public static void main(String[] args) {
    System.out.println(CONFIG.toRst());
    System.out.println(CONFIG.toEnrichedRst());
  }
}
+1 −1
Original line number Diff line number Diff line
@@ -77,7 +77,7 @@ public class ElasticsearchSinkTask extends SinkTask {
      if (client != null) {
        this.client = client;
      } else {
        String address = config.getString(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG);
        List<String> address = config.getList(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG);
        JestClientFactory factory = new JestClientFactory();
        factory.setHttpClientConfig(new HttpClientConfig.Builder(address).multiThreaded(true).build());
        this.client = factory.getObject();