Unverified Commit 70c36284 authored by Arjun Satish's avatar Arjun Satish Committed by GitHub
Browse files

Merge pull request #158 from wicknicks/CC-1372

CC-1372: Add configuration options for read and connect timeouts
parents c8dd6808 77a08926
Loading
Loading
Loading
Loading
+14 −0
Original line number Diff line number Diff line
@@ -61,6 +61,20 @@ Connector
  * Default: 100
  * Importance: low

``connection.timeout.ms``
  How long to wait in milliseconds when establishing a connection to the Elasticsearch server. The task fails if the client fails to connect to the server in this interval, and will need to be restarted.

  * Type: int
  * Default: 1000
  * Importance: low

``read.timeout.ms``
  How long to wait in milliseconds for the Elasticsearch server to send a response. The task fails if any read operation times out, and will need to be restarted to resume further operations.

  * Type: int
  * Default: 3000
  * Importance: low

Data Conversion
^^^^^^^^^^^^^^^

+13 −1
Original line number Diff line number Diff line
@@ -40,6 +40,8 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
  public static final String TOPIC_KEY_IGNORE_CONFIG = "topic.key.ignore";
  public static final String SCHEMA_IGNORE_CONFIG = "schema.ignore";
  public static final String TOPIC_SCHEMA_IGNORE_CONFIG = "topic.schema.ignore";
  public static final String CONNECTION_TIMEOUT_MS_CONFIG = "connection.timeout.ms";
  public static final String READ_TIMEOUT_MS_CONFIG = "read.timeout.ms";

  protected static ConfigDef baseConfigDef() {
    final ConfigDef configDef = new ConfigDef();
@@ -80,7 +82,17 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
          .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, Importance.LOW,
                  "How long to wait in milliseconds before attempting to retry a failed indexing request. "
                  + "This avoids retrying in a tight loop under failure scenarios.",
                  group, ++order, Width.SHORT, "Retry Backoff (ms)");
                  group, ++order, Width.SHORT, "Retry Backoff (ms)")
          .define(CONNECTION_TIMEOUT_MS_CONFIG, Type.INT, 1000, Importance.LOW, "How long to wait "
                  + "in milliseconds when establishing a connection to the Elasticsearch server. "
                  + "The task fails if the client fails to connect to the server in this "
                  + "interval, and will need to be restarted.",
                  group, ++order, Width.SHORT, "Connection Timeout")
          .define(READ_TIMEOUT_MS_CONFIG, Type.INT, 3000, Importance.LOW, "How long to wait in "
                  + "milliseconds for the Elasticsearch server to send a response. The task fails "
                  + "if any read operation times out, and will need to be restarted to resume "
                  + "further operations.",
                  group, ++order, Width.SHORT, "Read Timeout");
    }

    {
+8 −1
Original line number Diff line number Diff line
@@ -73,13 +73,20 @@ public class ElasticsearchSinkTask extends SinkTask {
      int maxInFlightRequests = config.getInt(ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG);
      long retryBackoffMs = config.getLong(ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG);
      int maxRetry = config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG);
      int connTimeout = config.getInt(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG);
      int readTimeout = config.getInt(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG);

      if (client != null) {
        this.client = client;
      } else {
        List<String> address = config.getList(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG);
        JestClientFactory factory = new JestClientFactory();
        factory.setHttpClientConfig(new HttpClientConfig.Builder(address).multiThreaded(true).build());
        factory.setHttpClientConfig(new HttpClientConfig.Builder(address)
            .connTimeout(connTimeout)
            .readTimeout(readTimeout)
            .multiThreaded(true)
            .build()
        );
        this.client = factory.getObject();
      }

+49 −0
Original line number Diff line number Diff line
package io.confluent.connect.elasticsearch;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

public class ElasticsearchSinkConnectorConfigTest {

  private Map<String, String> props;

  @Before
  public void setup() {
    props = new HashMap<>();
    props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, ElasticsearchSinkTestBase.TYPE);
    props.put(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "localhost");
    props.put(ElasticsearchSinkConnectorConfig.KEY_IGNORE_CONFIG, "true");
  }

  @Test
  public void testDefaultHttpTimeoutsConfig() {
    ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
    Assert.assertEquals(
        config.getInt(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG),
        (Integer) 3000
    );
    Assert.assertEquals(
        config.getInt(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG),
        (Integer) 1000
    );
  }

  @Test
  public void testSetHttpTimeoutsConfig() {
    props.put(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG, "10000");
    props.put(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG, "15000");
    ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
    Assert.assertEquals(
        config.getInt(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG),
        (Integer) 10000
    );
    Assert.assertEquals(
        config.getInt(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG),
        (Integer) 15000
    );
  }
}