Commit e9a9470a authored by Arjun Satish's avatar Arjun Satish
Browse files

CC-1372: Add configuration options for read and connect timeouts



Signed-off-by: default avatarArjun Satish <arjun@confluent.io>
parent c8dd6808
Loading
Loading
Loading
Loading
+14 −0
Original line number Diff line number Diff line
@@ -61,6 +61,20 @@ Connector
  * Default: 100
  * Importance: low

``conn.timeout.ms``
  How long to wait (in milliseconds) when establishing a connection to the Elasticsearch server.

  * Type: int
  * Default: 1000
  * Importance: low

``read.timeout.ms``
  How long to wait (in milliseconds) for the Elasticsearch server to send responses.

  * Type: int
  * Default: 3000
  * Importance: low

Data Conversion
^^^^^^^^^^^^^^^

+9 −1
Original line number Diff line number Diff line
@@ -40,6 +40,8 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
  public static final String TOPIC_KEY_IGNORE_CONFIG = "topic.key.ignore";
  public static final String SCHEMA_IGNORE_CONFIG = "schema.ignore";
  public static final String TOPIC_SCHEMA_IGNORE_CONFIG = "topic.schema.ignore";
  public static final String CONNECTION_TIMEOUT_MS_CONFIG = "conn.timeout.ms";
  public static final String READ_TIMEOUT_MS_CONFIG = "read.timeout.ms";

  protected static ConfigDef baseConfigDef() {
    final ConfigDef configDef = new ConfigDef();
@@ -80,7 +82,13 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
          .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, Importance.LOW,
                  "How long to wait in milliseconds before attempting to retry a failed indexing request. "
                  + "This avoids retrying in a tight loop under failure scenarios.",
                  group, ++order, Width.SHORT, "Retry Backoff (ms)");
                  group, ++order, Width.SHORT, "Retry Backoff (ms)")
          .define(CONNECTION_TIMEOUT_MS_CONFIG, Type.INT, 1000, Importance.LOW, "How long to wait "
                  + "(in milliseconds) when establishing a connection to the Elasticsearch server.",
                  group, ++order, Width.SHORT, "Connection Timeout")
          .define(READ_TIMEOUT_MS_CONFIG, Type.INT, 3000, Importance.LOW, "How long to wait (in "
                  + "milliseconds) for the Elasticsearch server to send responses.",
                  group, ++order, Width.SHORT, "Read Timeout");
    }

    {
+6 −1
Original line number Diff line number Diff line
@@ -73,13 +73,18 @@ public class ElasticsearchSinkTask extends SinkTask {
      int maxInFlightRequests = config.getInt(ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG);
      long retryBackoffMs = config.getLong(ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG);
      int maxRetry = config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG);
      int connTimeout = config.getInt(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG);
      int readTimeout = config.getInt(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG);

      if (client != null) {
        this.client = client;
      } else {
        List<String> address = config.getList(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG);
        JestClientFactory factory = new JestClientFactory();
        factory.setHttpClientConfig(new HttpClientConfig.Builder(address).multiThreaded(true).build());
        factory.setHttpClientConfig(new HttpClientConfig.Builder(address)
            .connTimeout(connTimeout)
            .readTimeout(readTimeout)
            .multiThreaded(true).build());
        this.client = factory.getObject();
      }

+49 −0
Original line number Diff line number Diff line
package io.confluent.connect.elasticsearch;

import org.junit.Assert;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

public class ElasticsearchSinkConnectorConfigTest {

  private Map<String, String> createProps() {
    Map<String, String> props = new HashMap<>();
    props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG,
        ElasticsearchSinkTestBase.TYPE);
    props.put(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "localhost");
    props.put(ElasticsearchSinkConnectorConfig.KEY_IGNORE_CONFIG, "true");
    return props;
  }

  @Test
  public void testDefaultHttpTimeoutsConfig() {
    Map<String, String> props = createProps();
    ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
    Assert.assertEquals(
        config.getInt(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG),
        (Integer) 3000
    );
    Assert.assertEquals(
        config.getInt(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG),
        (Integer) 1000
    );
  }

  @Test
  public void testSetHttpTimeoutsConfig() {
    Map<String, String> props = createProps();
    props.put(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG, "10000");
    props.put(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG, "15000");
    ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
    Assert.assertEquals(
        config.getInt(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG),
        (Integer) 10000
    );
    Assert.assertEquals(
        config.getInt(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG),
        (Integer) 15000
    );
  }
}