Commit 73bb7ba0 authored by Arjun Satish's avatar Arjun Satish
Browse files

Merge remote-tracking branch 'origin/3.2.x' into 3.3.x



Signed-off-by: default avatarArjun Satish <arjun@confluent.io>
parents b25dc455 70c36284
Loading
Loading
Loading
Loading
+14 −0
Original line number Diff line number Diff line
@@ -61,6 +61,20 @@ Connector
  * Default: 100
  * Importance: low

``connection.timeout.ms``
  How long to wait in milliseconds when establishing a connection to the Elasticsearch server. The task fails if the client fails to connect to the server in this interval, and will need to be restarted.

  * Type: int
  * Default: 1000
  * Importance: low

``read.timeout.ms``
  How long to wait in milliseconds for the Elasticsearch server to send a response. The task fails if any read operation times out, and will need to be restarted to resume further operations.

  * Type: int
  * Default: 3000
  * Importance: low

Data Conversion
^^^^^^^^^^^^^^^

+13 −1
Original line number Diff line number Diff line
@@ -41,6 +41,8 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
  public static final String SCHEMA_IGNORE_CONFIG = "schema.ignore";
  public static final String TOPIC_SCHEMA_IGNORE_CONFIG = "topic.schema.ignore";
  public static final String COMPACT_MAP_ENTRIES_CONFIG = "compact.map.entries";
  public static final String CONNECTION_TIMEOUT_MS_CONFIG = "connection.timeout.ms";
  public static final String READ_TIMEOUT_MS_CONFIG = "read.timeout.ms";

  protected static ConfigDef baseConfigDef() {
    final ConfigDef configDef = new ConfigDef();
@@ -82,7 +84,17 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
                  "How long to wait in milliseconds before attempting the first retry of a failed indexing request. "
                  + "Upon a failure, this connector may wait up to twice as long as the previous wait, up to the maximum number of retries. "
                  + "This avoids retrying in a tight loop under failure scenarios.",
                  group, ++order, Width.SHORT, "Retry Backoff (ms)");
                  group, ++order, Width.SHORT, "Retry Backoff (ms)")
          .define(CONNECTION_TIMEOUT_MS_CONFIG, Type.INT, 1000, Importance.LOW, "How long to wait "
                  + "in milliseconds when establishing a connection to the Elasticsearch server. "
                  + "The task fails if the client fails to connect to the server in this "
                  + "interval, and will need to be restarted.",
                  group, ++order, Width.SHORT, "Connection Timeout")
          .define(READ_TIMEOUT_MS_CONFIG, Type.INT, 3000, Importance.LOW, "How long to wait in "
                  + "milliseconds for the Elasticsearch server to send a response. The task fails "
                  + "if any read operation times out, and will need to be restarted to resume "
                  + "further operations.",
                  group, ++order, Width.SHORT, "Read Timeout");
    }

    {
+8 −1
Original line number Diff line number Diff line
@@ -75,6 +75,8 @@ public class ElasticsearchSinkTask extends SinkTask {
      int maxInFlightRequests = config.getInt(ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG);
      long retryBackoffMs = config.getLong(ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG);
      int maxRetry = config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG);
      int connTimeout = config.getInt(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG);
      int readTimeout = config.getInt(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG);

      // Calculate the maximum possible backoff time ...
      long maxRetryBackoffMs = RetryUtil.computeRetryWaitTimeInMillis(maxRetry, retryBackoffMs);
@@ -91,7 +93,12 @@ public class ElasticsearchSinkTask extends SinkTask {
      } else {
        List<String> address = config.getList(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG);
        JestClientFactory factory = new JestClientFactory();
        factory.setHttpClientConfig(new HttpClientConfig.Builder(address).multiThreaded(true).build());
        factory.setHttpClientConfig(new HttpClientConfig.Builder(address)
            .connTimeout(connTimeout)
            .readTimeout(readTimeout)
            .multiThreaded(true)
            .build()
        );
        this.client = factory.getObject();
      }

+49 −0
Original line number Diff line number Diff line
package io.confluent.connect.elasticsearch;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

public class ElasticsearchSinkConnectorConfigTest {

  private Map<String, String> props;

  @Before
  public void setup() {
    props = new HashMap<>();
    props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, ElasticsearchSinkTestBase.TYPE);
    props.put(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "localhost");
    props.put(ElasticsearchSinkConnectorConfig.KEY_IGNORE_CONFIG, "true");
  }

  @Test
  public void testDefaultHttpTimeoutsConfig() {
    ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
    Assert.assertEquals(
        config.getInt(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG),
        (Integer) 3000
    );
    Assert.assertEquals(
        config.getInt(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG),
        (Integer) 1000
    );
  }

  @Test
  public void testSetHttpTimeoutsConfig() {
    props.put(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG, "10000");
    props.put(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG, "15000");
    ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
    Assert.assertEquals(
        config.getInt(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG),
        (Integer) 10000
    );
    Assert.assertEquals(
        config.getInt(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG),
        (Integer) 15000
    );
  }
}