Commit 30f6a2b5 authored by Ewen Cheslack-Postava's avatar Ewen Cheslack-Postava
Browse files

Merge remote-tracking branch 'origin/master' into 0.10.0.0

parents 33d3af7e ab5f0159
Loading
Loading
Loading
Loading
+4 −3
Original line number Diff line number Diff line
Configuration Options
---------------------

``connection.url``
  The URL to connect to Elasticsearch.

@@ -25,7 +26,7 @@ Configuration Options
  The number of requests to process as a batch when writing to Elasticsearch.

  * Type: int
  * Default: 10000
  * Default: 2000
  * Importance: medium

``max.in.flight.requests``
@@ -53,10 +54,10 @@ Configuration Options
  Approximately the max number of records each task will buffer. This config controls the memory usage for each task. When the number of buffered records is larger than this value, the partitions assigned to this task will be paused.

  * Type: long
  * Default: 100000
  * Default: 20000
  * Importance: low

``max.retry``
``max.retries``
  The max allowed number of retries. Allowing retries will potentially change the ordering of records.

  * Type: int
+11 −7
Original line number Diff line number Diff line
@@ -27,15 +27,15 @@ import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;

@@ -47,10 +47,15 @@ import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConst

public class DataConverter {

  private static final Logger log = LoggerFactory.getLogger(DataConverter.class);
  private static final Converter JSON_CONVERTER;
  static {
    JSON_CONVERTER = new JsonConverter();
    JSON_CONVERTER.configure(Collections.singletonMap("schemas.enable", "false"), false);
  }

  /**
   * Convert the key to the string representation.
   *
   * @param key The key of a SinkRecord.
   * @param keySchema The key schema.
   * @return The string representation of the key.
@@ -83,9 +88,9 @@ public class DataConverter {

  /**
   * Convert a SinkRecord to an IndexRequest.
   *
   * @param record The SinkRecord to be converted.
   * @param client The client to connect to Elasticsearch.
   * @param converter The converter to use to convert the value to JSON.
   * @param ignoreKey Whether to ignore the key during indexing.
   * @param ignoreSchema Whether to ignore the schema during indexing.
   * @param topicConfigs The map of per topic configs.
@@ -97,7 +102,6 @@ public class DataConverter {
      SinkRecord record,
      String type,
      JestClient client,
      Converter converter,
      boolean ignoreKey,
      boolean ignoreSchema,
      Map<String, TopicConfig> topicConfigs,
@@ -156,7 +160,7 @@ public class DataConverter {
      newValue = value;
    }

    byte[] json = converter.fromConnectData(topic, newSchema, newValue);
    byte[] json = JSON_CONVERTER.fromConnectData(topic, newSchema, newValue);
    return new ESRequest(index, type, id, json);
  }

+7 −7
Original line number Diff line number Diff line
@@ -67,12 +67,12 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
  private static final String MAX_BUFFERED_RECORDS_DOC =
      "Approximately the max number of records each task will buffer. This config controls the memory usage for each task. When the number of "
      + "buffered records is larger than this value, the partitions assigned to this task will be paused.";
  private static final long MAX_BUFFERED_RECORDS_DEFAULT = 100000;
  private static final long MAX_BUFFERED_RECORDS_DEFAULT = 20000;
  private static final String MAX_BUFFERED_RECORDS_DISPLAY = "Max Number of Records to Buffer";

  public static final String BATCH_SIZE_CONFIG = "batch.size";
  private static final String BATCH_SIZE_DOC = "The number of requests to process as a batch when writing to Elasticsearch.";
  private static final int BATCH_SIZE_DEFAULT = 10000;
  private static final int BATCH_SIZE_DEFAULT = 2000;
  private static final String BATCH_SIZE_DISPLAY = "Batch Size";

  public static final String LINGER_MS_CONFIG = "linger.ms";
@@ -99,10 +99,10 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
  private static final long RETRY_BACKOFF_MS_DEFAULT = 100L;
  private static final String RETRY_BACKOFF_MS_DISPLAY = "Retry Backoff (ms)";

  public static final String MAX_RETRY_CONFIG = "max.retry";
  private static final String MAX_RETRY_DOC = "The max allowed number of retries. Allowing retries will potentially change the ordering of records.";
  private static final int MAX_RETRY_DEFAULT = 5;
  private static final String MAX_RETRY_DISPLAY = "Max Retry";
  public static final String MAX_RETRIES_CONFIG = "max.retries";
  private static final String MAX_RETRIES_DOC = "The max allowed number of retries. Allowing retries will potentially change the ordering of records.";
  private static final int MAX_RETRIES_DEFAULT = 5;
  private static final String MAX_RETRIES_DISPLAY = "Max Retries";

  public static final String SCHEMA_IGNORE_CONFIG = "schema.ignore";
  private static final String SCHEMA_IGNORE_DOC =
@@ -133,7 +133,7 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
        .define(TOPIC_SCHEMA_IGNORE_CONFIG, Type.LIST, TOPIC_SCHEMA_IGNORE_DEFAULT, Importance.LOW, TOPIC_SCHEMA_IGNORE_DOC, CONNECTOR_GROUP, 9, Width.LONG, TOPIC_SCHEMA_IGNORE_DISPLAY)
        .define(LINGER_MS_CONFIG, Type.LONG, LINGER_MS_DEFAULT, Importance.LOW, LINGER_MS_DOC, CONNECTOR_GROUP, 10, Width.SHORT, LINGER_MS_DISPLAY)
        .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, RETRY_BACKOFF_MS_DEFAULT, Importance.LOW, RETRY_BACKOFF_MS_DOC, CONNECTOR_GROUP, 11, Width.SHORT, RETRY_BACKOFF_MS_DISPLAY)
        .define(MAX_RETRY_CONFIG, Type.INT, MAX_RETRY_DEFAULT, Importance.LOW, MAX_RETRY_DOC, CONNECTOR_GROUP, 12, Width.SHORT, MAX_RETRY_DISPLAY)
        .define(MAX_RETRIES_CONFIG, Type.INT, MAX_RETRIES_DEFAULT, Importance.LOW, MAX_RETRIES_DOC, CONNECTOR_GROUP, 12, Width.SHORT, MAX_RETRIES_DISPLAY)
        .define(FLUSH_TIMEOUT_MS_CONFIG, Type.LONG, FLUSH_TIMEOUT_MS_DEFAULT, Importance.LOW, FLUSH_TIMEOUT_MS_DOC, CONNECTOR_GROUP, 13, Width.SHORT, FLUSH_TIMEOUT_MS_DISPLAY)
        .define(MAX_BUFFERED_RECORDS_CONFIG, Type.LONG, MAX_BUFFERED_RECORDS_DEFAULT, Importance.LOW, MAX_BUFFERED_RECORDS_DOC, CONNECTOR_GROUP, 14, Width.SHORT, MAX_BUFFERED_RECORDS_DISPLAY);
  }
+2 −17
Original line number Diff line number Diff line
@@ -20,10 +20,8 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -44,15 +42,6 @@ public class ElasticsearchSinkTask extends SinkTask {
  private ElasticsearchWriter writer;
  private JestClient client;
  private ElasticsearchWriter.Builder builder;
  private static Converter converter;

  static {
    // Config the JsonConverter
    converter = new JsonConverter();
    Map<String, String> configs = new HashMap<>();
    configs.put("schemas.enable", "false");
    converter.configure(configs, false);
  }

  @Override
  public String version() {
@@ -85,7 +74,7 @@ public class ElasticsearchSinkTask extends SinkTask {
      long lingerMs = config.getLong(ElasticsearchSinkConnectorConfig.LINGER_MS_CONFIG);
      int maxInFlightRequests = config.getInt(ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG);
      long retryBackoffMs = config.getLong(ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG);
      int maxRetry = config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRY_CONFIG);
      int maxRetry = config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG);

      if (client != null) {
        this.client = client;
@@ -108,8 +97,7 @@ public class ElasticsearchSinkTask extends SinkTask {
          .setLingerMs(lingerMs)
          .setRetryBackoffMs(retryBackoffMs)
          .setMaxRetry(maxRetry)
          .setContext(context)
          .setConverter(converter);
          .setContext(context);

    } catch (ConfigException e) {
      throw new ConnectException("Couldn't start ElasticsearchSinkTask due to configuration error:", e);
@@ -180,7 +168,4 @@ public class ElasticsearchSinkTask extends SinkTask {
    return topicConfigMap;
  }

  public static Converter getConverter() {
    return converter;
  }
}
+8 −24
Original line number Diff line number Diff line
@@ -18,9 +18,9 @@ package io.confluent.connect.elasticsearch;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -57,8 +57,6 @@ import io.searchbox.indices.IndicesExists;
public class ElasticsearchWriter {
  private static final Logger log = LoggerFactory.getLogger(ElasticsearchWriter.class);

  private final Converter converter;

  private final JestClient client;
  private final BulkProcessor bulkProcessor;
  private final String type;
@@ -97,8 +95,7 @@ public class ElasticsearchWriter {
      long lingerMs,
      int maxRetry,
      long retryBackoffMs,
      SinkTaskContext context,
      Converter converter) {
      SinkTaskContext context) {

    this.client = client;
    this.type = type;
@@ -119,9 +116,6 @@ public class ElasticsearchWriter {
    // create index if needed.
    createIndices(topicConfigs);

    // Config the JsonConverter
    this.converter = converter;

    // Start the BulkProcessor
    bulkProcessor = new BulkProcessor(new HttpClient(client), maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs, createDefaultListener());
    bulkProcessor.start();
@@ -144,7 +138,6 @@ public class ElasticsearchWriter {
    private int maxRetry;
    private long retryBackoffMs;
    private SinkTaskContext context;
    private Converter converter = ElasticsearchSinkTask.getConverter();

    /**
     * Constructor of ElasticsearchWriter Builder.
@@ -276,34 +269,25 @@ public class ElasticsearchWriter {
      return this;
    }

    /**
     * Set the converter.
     * @param converter The converter to use.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setConverter(Converter converter) {
      this.converter = converter;
      return this;
    }

    /**
     * Build the ElasticsearchWriter.
     * @return an instance of ElasticsearchWriter.
     */
    public ElasticsearchWriter build() {
      return new ElasticsearchWriter(
          client, type, ignoreKey, ignoreSchema, topicConfigs, flushTimeoutMs, maxBufferedRecords, maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs, context, converter);
          client, type, ignoreKey, ignoreSchema, topicConfigs, flushTimeoutMs, maxBufferedRecords, maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs, context);
    }
  }

  public void write(Collection<SinkRecord> records) {
    if (bulkProcessor.getException() != null) {
      throw new ConnectException("BulkProcessor fails with non reriable exception.", bulkProcessor.getException());
      throw new ConnectException("BulkProcessor failed with non-retriable exception", bulkProcessor.getException());
    }
    if (bulkProcessor.getTotalBufferedRecords() + records.size() > maxBufferedRecords) {
      throw new RetriableException("Exceeded max number of buffered records: " + maxBufferedRecords);
    }
    bulkProcessor.exceedMaxBufferedRecords(maxBufferedRecords, records.size());

    for (SinkRecord record: records) {
      ESRequest request = DataConverter.convertRecord(record, type, client, converter, ignoreKey, ignoreSchema, topicConfigs, mappings);
      ESRequest request = DataConverter.convertRecord(record, type, client, ignoreKey, ignoreSchema, topicConfigs, mappings);
      bulkProcessor.add(request);
    }
  }
Loading