Commit 3913d8ae authored by Robert Yokota's avatar Robert Yokota
Browse files

Merge remote-tracking branch 'upstream/master'

parents 1a0e5f07 c9d84c5e
Loading
Loading
Loading
Loading
+8 −0
Original line number Diff line number Diff line
@@ -143,3 +143,11 @@ Data Conversion
  * Default: ignore
  * Valid Values: [ignore, delete, fail]
  * Importance: low

``behavior.on.malformed.documents``
  How to handle records that Elasticsearch rejects due to some malformation of the document itself, such as an index mapping conflict or a field name containing illegal characters. Valid options are 'ignore', 'warn', and 'fail'.

  * Type: string
  * Default: fail
  * Valid Values: [ignore, warn, fail]
  * Importance: low
 No newline at end of file
+1 −1
Original line number Diff line number Diff line
@@ -34,7 +34,7 @@ producer to Elasticsearch.
**Prerequisites:**

- :ref:`Confluent Platform <installation>` is installed and services are running by using the Confluent CLI. This quick start assumes that you are using the Confluent CLI, but standalone installations are also supported. By default ZooKeeper, Kafka, Schema Registry, Kafka Connect REST API, and Kafka Connect are started with the ``confluent start`` command. For more information, see :ref:`installation_archive`.
- Elasticsearch 5.x is installed and running.
- Elasticsearch 5.x or 6.x is installed and running.

----------------------------
Add a Record to the Consumer
+18 −1
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigDef.Width;
import java.util.Map;

import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues;
import static io.confluent.connect.elasticsearch.bulk.BulkProcessor.BehaviorOnMalformedDoc;

public class ElasticsearchSinkConnectorConfig extends AbstractConfig {

@@ -132,6 +133,11 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
      + "non-null key and a null value (i.e. Kafka tombstone records). Valid options are "
      + "'ignore', 'delete', and 'fail'.";

  public static final String BEHAVIOR_ON_MALFORMED_DOCS_CONFIG = "behavior.on.malformed.documents";
  private static final String BEHAVIOR_ON_MALFORMED_DOCS_DOC = "How to handle records that "
      + "Elasticsearch rejects due to some malformation of the document itself, such as an index"
      + " mapping conflict or a field name containing illegal characters. Valid options are "
      + "'ignore', 'warn', and 'fail'.";

  protected static ConfigDef baseConfigDef() {
    final ConfigDef configDef = new ConfigDef();
@@ -336,7 +342,18 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
        group,
        ++order,
        Width.SHORT,
        "Behavior for null-valued records");
        "Behavior for null-valued records"
    ).define(
        BEHAVIOR_ON_MALFORMED_DOCS_CONFIG,
        Type.STRING,
        BehaviorOnMalformedDoc.DEFAULT.toString(),
        BehaviorOnMalformedDoc.VALIDATOR,
        Importance.LOW,
        BEHAVIOR_ON_MALFORMED_DOCS_DOC,
        group,
        ++order,
        Width.SHORT,
        "Behavior on malformed documents");
  }

  public static final ConfigDef CONFIG = baseConfigDef();
+8 −1
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@

package io.confluent.connect.elasticsearch;

import io.confluent.connect.elasticsearch.bulk.BulkProcessor;
import io.confluent.connect.elasticsearch.jest.JestElasticsearchClient;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
@@ -95,6 +96,11 @@ public class ElasticsearchSinkTask extends SinkTask {
              config.getString(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG)
          );

      BulkProcessor.BehaviorOnMalformedDoc behaviorOnMalformedDoc =
          BulkProcessor.BehaviorOnMalformedDoc.forValue(
              config.getString(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG)
          );

      // Calculate the maximum possible backoff time ...
      long maxRetryBackoffMs =
          RetryUtil.computeRetryWaitTimeInMillis(maxRetry, retryBackoffMs);
@@ -127,7 +133,8 @@ public class ElasticsearchSinkTask extends SinkTask {
          .setRetryBackoffMs(retryBackoffMs)
          .setMaxRetry(maxRetry)
          .setDropInvalidMessage(dropInvalidMessage)
          .setBehaviorOnNullValues(behaviorOnNullValues);
          .setBehaviorOnNullValues(behaviorOnNullValues)
          .setBehaviorOnMalformedDoc(behaviorOnMalformedDoc);

      writer = builder.build();
      writer.start();
+15 −3
Original line number Diff line number Diff line
@@ -33,6 +33,7 @@ import java.util.Objects;
import java.util.Set;

import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues;
import static io.confluent.connect.elasticsearch.bulk.BulkProcessor.BehaviorOnMalformedDoc;

public class ElasticsearchWriter {
  private static final Logger log = LoggerFactory.getLogger(ElasticsearchWriter.class);
@@ -52,6 +53,7 @@ public class ElasticsearchWriter {
  private final DataConverter converter;

  private final Set<String> existingMappings;
  private final BehaviorOnMalformedDoc behaviorOnMalformedDoc;

  ElasticsearchWriter(
      ElasticsearchClient client,
@@ -70,7 +72,8 @@ public class ElasticsearchWriter {
      int maxRetries,
      long retryBackoffMs,
      boolean dropInvalidMessage,
      BehaviorOnNullValues behaviorOnNullValues
      BehaviorOnNullValues behaviorOnNullValues,
      BehaviorOnMalformedDoc behaviorOnMalformedDoc
  ) {
    this.client = client;
    this.type = type;
@@ -83,6 +86,7 @@ public class ElasticsearchWriter {
    this.dropInvalidMessage = dropInvalidMessage;
    this.behaviorOnNullValues = behaviorOnNullValues;
    this.converter = new DataConverter(useCompactMapEntries, behaviorOnNullValues);
    this.behaviorOnMalformedDoc = behaviorOnMalformedDoc;

    bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
@@ -92,7 +96,8 @@ public class ElasticsearchWriter {
        batchSize,
        lingerMs,
        maxRetries,
        retryBackoffMs
        retryBackoffMs,
        behaviorOnMalformedDoc
    );

    existingMappings = new HashSet<>();
@@ -116,6 +121,7 @@ public class ElasticsearchWriter {
    private long retryBackoffMs;
    private boolean dropInvalidMessage;
    private BehaviorOnNullValues behaviorOnNullValues = BehaviorOnNullValues.DEFAULT;
    private BehaviorOnMalformedDoc behaviorOnMalformedDoc;

    public Builder(ElasticsearchClient client) {
      this.client = client;
@@ -200,6 +206,11 @@ public class ElasticsearchWriter {
      return this;
    }

    public Builder setBehaviorOnMalformedDoc(BehaviorOnMalformedDoc behaviorOnMalformedDoc) {
      this.behaviorOnMalformedDoc = behaviorOnMalformedDoc;
      return this;
    }

    public ElasticsearchWriter build() {
      return new ElasticsearchWriter(
          client,
@@ -218,7 +229,8 @@ public class ElasticsearchWriter {
          maxRetry,
          retryBackoffMs,
          dropInvalidMessage,
          behaviorOnNullValues
          behaviorOnNullValues,
          behaviorOnMalformedDoc
      );
    }
  }
Loading