Commit 9234e42a authored by Arjun Satish's avatar Arjun Satish
Browse files

Merge branch '4.0.x'

parents a555b3b4 57ce3faa
Loading
Loading
Loading
Loading
+14 −0
Original line number Diff line number Diff line
@@ -61,6 +61,20 @@ Connector
  * Default: 100
  * Importance: low

``connection.timeout.ms``
  How long to wait in milliseconds when establishing a connection to the Elasticsearch server. The task fails if the client fails to connect to the server in this interval, and will need to be restarted.

  * Type: int
  * Default: 1000
  * Importance: low

``read.timeout.ms``
  How long to wait in milliseconds for the Elasticsearch server to send a response. The task fails if any read operation times out, and will need to be restarted to resume further operations.

  * Type: int
  * Default: 3000
  * Importance: low

Data Conversion
^^^^^^^^^^^^^^^

+31 −1
Original line number Diff line number Diff line
@@ -114,6 +114,17 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
      + "Prior to 3.3.0, this connector always wrote map entries as nested documents, "
      + "so set this to ``false`` to use that older behavior.";

  public static final String CONNECTION_TIMEOUT_MS_CONFIG = "connection.timeout.ms";
  public static final String READ_TIMEOUT_MS_CONFIG = "read.timeout.ms";
  private static final String CONNECTION_TIMEOUT_MS_CONFIG_DOC = "How long to wait "
                  + "in milliseconds when establishing a connection to the Elasticsearch server. "
                  + "The task fails if the client fails to connect to the server in this "
                  + "interval, and will need to be restarted.";
  private static final String READ_TIMEOUT_MS_CONFIG_DOC = "How long to wait in "
                  + "milliseconds for the Elasticsearch server to send a response. The task fails "
                  + "if any read operation times out, and will need to be restarted to resume "
                  + "further operations.";

  protected static ConfigDef baseConfigDef() {
    final ConfigDef configDef = new ConfigDef();
    addConnectorConfigs(configDef);
@@ -203,7 +214,26 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
        ++order,
        Width.SHORT,
        "Retry Backoff (ms)"
      );
      ).define(
        CONNECTION_TIMEOUT_MS_CONFIG, 
        Type.INT, 
        1000, 
        Importance.LOW, 
        CONNECTION_TIMEOUT_MS_CONFIG_DOC,
        group, 
        ++order, 
        Width.SHORT, 
        "Connection Timeout"
        ).define(
        READ_TIMEOUT_MS_CONFIG, 
        Type.INT, 
        3000, 
        Importance.LOW, 
        READ_TIMEOUT_MS_CONFIG_DOC,
        group, 
        ++order, 
        Width.SHORT, 
        "Read Timeout");
  }

  private static void addConversionConfigs(ConfigDef configDef) {
+10 −4
Original line number Diff line number Diff line
@@ -104,14 +104,20 @@ public class ElasticsearchSinkTask extends SinkTask {
                TimeUnit.MILLISECONDS.toHours(maxRetryBackoffMs));
      }

      int connTimeout = config.getInt(
          ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG);
      int readTimeout = config.getInt(
          ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG);

      if (client != null) {
        this.client = client;
      } else {
        List<String> address =
            config.getList(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG);
        JestClientFactory factory = new JestClientFactory();
        factory.setHttpClientConfig(
            new HttpClientConfig.Builder(address)
        factory.setHttpClientConfig(new HttpClientConfig.Builder(address)
            .connTimeout(connTimeout)
            .readTimeout(readTimeout)
            .multiThreaded(true)
            .build()
        );
+49 −0
Original line number Diff line number Diff line
package io.confluent.connect.elasticsearch;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

public class ElasticsearchSinkConnectorConfigTest {

  private Map<String, String> props;

  @Before
  public void setup() {
    props = new HashMap<>();
    props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, ElasticsearchSinkTestBase.TYPE);
    props.put(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "localhost");
    props.put(ElasticsearchSinkConnectorConfig.KEY_IGNORE_CONFIG, "true");
  }

  @Test
  public void testDefaultHttpTimeoutsConfig() {
    ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
    Assert.assertEquals(
        config.getInt(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG),
        (Integer) 3000
    );
    Assert.assertEquals(
        config.getInt(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG),
        (Integer) 1000
    );
  }

  @Test
  public void testSetHttpTimeoutsConfig() {
    props.put(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG, "10000");
    props.put(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG, "15000");
    ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
    Assert.assertEquals(
        config.getInt(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG),
        (Integer) 10000
    );
    Assert.assertEquals(
        config.getInt(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG),
        (Integer) 15000
    );
  }
}