Unverified Commit 400e7439 authored by Konstantine Karantasis's avatar Konstantine Karantasis Committed by GitHub
Browse files

Merge pull request #148 from stitchfix/ignore-mapper-errors

Add config that allows Elasticsearch mapper parsing errors to be ignored
parents 29d6b201 ea695c7c
Loading
Loading
Loading
Loading
+8 −0
Original line number Diff line number Diff line
@@ -143,3 +143,11 @@ Data Conversion
  * Default: ignore
  * Valid Values: [ignore, delete, fail]
  * Importance: low

``behavior.on.malformed.documents``
  How to handle records that Elasticsearch rejects due to some malformation of the document itself, such as an index mapping conflict or a field name containing illegal characters. Valid options are 'ignore', 'warn', and 'fail'.

  * Type: string
  * Default: fail
  * Valid Values: [ignore, warn, fail]
  * Importance: low
 No newline at end of file
+18 −1
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigDef.Width;
import java.util.Map;

import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues;
import static io.confluent.connect.elasticsearch.bulk.BulkProcessor.BehaviorOnMalformedDoc;

public class ElasticsearchSinkConnectorConfig extends AbstractConfig {

@@ -132,6 +133,11 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
      + "non-null key and a null value (i.e. Kafka tombstone records). Valid options are "
      + "'ignore', 'delete', and 'fail'.";

  public static final String BEHAVIOR_ON_MALFORMED_DOCS_CONFIG = "behavior.on.malformed.documents";
  private static final String BEHAVIOR_ON_MALFORMED_DOCS_DOC = "How to handle records that "
      + "Elasticsearch rejects due to some malformation of the document itself, such as an index"
      + " mapping conflict or a field name containing illegal characters. Valid options are "
      + "'ignore', 'warn', and 'fail'.";

  protected static ConfigDef baseConfigDef() {
    final ConfigDef configDef = new ConfigDef();
@@ -336,7 +342,18 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
        group,
        ++order,
        Width.SHORT,
        "Behavior for null-valued records");
        "Behavior for null-valued records"
    ).define(
        BEHAVIOR_ON_MALFORMED_DOCS_CONFIG,
        Type.STRING,
        BehaviorOnMalformedDoc.DEFAULT.toString(),
        BehaviorOnMalformedDoc.VALIDATOR,
        Importance.LOW,
        BEHAVIOR_ON_MALFORMED_DOCS_DOC,
        group,
        ++order,
        Width.SHORT,
        "Behavior on malformed documents");
  }

  public static final ConfigDef CONFIG = baseConfigDef();
+8 −1
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@

package io.confluent.connect.elasticsearch;

import io.confluent.connect.elasticsearch.bulk.BulkProcessor;
import io.confluent.connect.elasticsearch.jest.JestElasticsearchClient;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
@@ -95,6 +96,11 @@ public class ElasticsearchSinkTask extends SinkTask {
              config.getString(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG)
          );

      BulkProcessor.BehaviorOnMalformedDoc behaviorOnMalformedDoc =
          BulkProcessor.BehaviorOnMalformedDoc.forValue(
              config.getString(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG)
          );

      // Calculate the maximum possible backoff time ...
      long maxRetryBackoffMs =
          RetryUtil.computeRetryWaitTimeInMillis(maxRetry, retryBackoffMs);
@@ -127,7 +133,8 @@ public class ElasticsearchSinkTask extends SinkTask {
          .setRetryBackoffMs(retryBackoffMs)
          .setMaxRetry(maxRetry)
          .setDropInvalidMessage(dropInvalidMessage)
          .setBehaviorOnNullValues(behaviorOnNullValues);
          .setBehaviorOnNullValues(behaviorOnNullValues)
          .setBehaviorOnMalformedDoc(behaviorOnMalformedDoc);

      writer = builder.build();
      writer.start();
+15 −3
Original line number Diff line number Diff line
@@ -33,6 +33,7 @@ import java.util.Objects;
import java.util.Set;

import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues;
import static io.confluent.connect.elasticsearch.bulk.BulkProcessor.BehaviorOnMalformedDoc;

public class ElasticsearchWriter {
  private static final Logger log = LoggerFactory.getLogger(ElasticsearchWriter.class);
@@ -52,6 +53,7 @@ public class ElasticsearchWriter {
  private final DataConverter converter;

  private final Set<String> existingMappings;
  private final BehaviorOnMalformedDoc behaviorOnMalformedDoc;

  ElasticsearchWriter(
      ElasticsearchClient client,
@@ -70,7 +72,8 @@ public class ElasticsearchWriter {
      int maxRetries,
      long retryBackoffMs,
      boolean dropInvalidMessage,
      BehaviorOnNullValues behaviorOnNullValues
      BehaviorOnNullValues behaviorOnNullValues,
      BehaviorOnMalformedDoc behaviorOnMalformedDoc
  ) {
    this.client = client;
    this.type = type;
@@ -83,6 +86,7 @@ public class ElasticsearchWriter {
    this.dropInvalidMessage = dropInvalidMessage;
    this.behaviorOnNullValues = behaviorOnNullValues;
    this.converter = new DataConverter(useCompactMapEntries, behaviorOnNullValues);
    this.behaviorOnMalformedDoc = behaviorOnMalformedDoc;

    bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
@@ -92,7 +96,8 @@ public class ElasticsearchWriter {
        batchSize,
        lingerMs,
        maxRetries,
        retryBackoffMs
        retryBackoffMs,
        behaviorOnMalformedDoc
    );

    existingMappings = new HashSet<>();
@@ -116,6 +121,7 @@ public class ElasticsearchWriter {
    private long retryBackoffMs;
    private boolean dropInvalidMessage;
    private BehaviorOnNullValues behaviorOnNullValues = BehaviorOnNullValues.DEFAULT;
    private BehaviorOnMalformedDoc behaviorOnMalformedDoc;

    public Builder(ElasticsearchClient client) {
      this.client = client;
@@ -200,6 +206,11 @@ public class ElasticsearchWriter {
      return this;
    }

    public Builder setBehaviorOnMalformedDoc(BehaviorOnMalformedDoc behaviorOnMalformedDoc) {
      this.behaviorOnMalformedDoc = behaviorOnMalformedDoc;
      return this;
    }

    public ElasticsearchWriter build() {
      return new ElasticsearchWriter(
          client,
@@ -218,7 +229,8 @@ public class ElasticsearchWriter {
          maxRetry,
          retryBackoffMs,
          dropInvalidMessage,
          behaviorOnNullValues
          behaviorOnNullValues,
          behaviorOnMalformedDoc
      );
    }
  }
+99 −3
Original line number Diff line number Diff line
@@ -16,7 +16,9 @@

package io.confluent.connect.elasticsearch.bulk;

import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
import io.confluent.connect.elasticsearch.RetryUtil;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
@@ -26,6 +28,7 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -53,6 +56,7 @@ public class BulkProcessor<R, B> {
  private final long lingerMs;
  private final int maxRetries;
  private final long retryBackoffMs;
  private final BehaviorOnMalformedDoc behaviorOnMalformedDoc;

  private final Thread farmer;
  private final ExecutorService executor;
@@ -76,7 +80,8 @@ public class BulkProcessor<R, B> {
      int batchSize,
      long lingerMs,
      int maxRetries,
      long retryBackoffMs
      long retryBackoffMs,
      BehaviorOnMalformedDoc behaviorOnMalformedDoc
  ) {
    this.time = time;
    this.bulkClient = bulkClient;
@@ -85,6 +90,7 @@ public class BulkProcessor<R, B> {
    this.lingerMs = lingerMs;
    this.maxRetries = maxRetries;
    this.retryBackoffMs = retryBackoffMs;
    this.behaviorOnMalformedDoc = behaviorOnMalformedDoc;

    unsentRecords = new ArrayDeque<>(maxBufferedRecords);

@@ -372,9 +378,15 @@ public class BulkProcessor<R, B> {
                      batchId, batch.size(), attempts, maxAttempts);
            }
            return bulkRsp;
          }
          } else if (responseContainsMalformedDocError(bulkRsp)) {
            retriable = bulkRsp.isRetriable();
            handleMalformedDoc(bulkRsp);
            return bulkRsp;
          } else {
            // for all other errors, throw the error up
            retriable = bulkRsp.isRetriable();
            throw new ConnectException("Bulk request failed: " + bulkRsp.getErrorInfo());
          }
        } catch (Exception e) {
          if (retriable && attempts < maxAttempts) {
            long sleepTimeMs = RetryUtil.computeRandomRetryWaitTimeInMillis(retryAttempts,
@@ -391,6 +403,43 @@ public class BulkProcessor<R, B> {
        }
      }
    }

    private void handleMalformedDoc(BulkResponse bulkRsp) {
      // if the elasticsearch request failed because of a malformed document,
      // the behavior is configurable.
      switch (behaviorOnMalformedDoc) {
        case IGNORE:
          log.debug("Encountered an illegal document error when executing batch {} of {}"
                  + " records. Ignoring and will not index record. Error was {}",
              batchId, batch.size(), bulkRsp.getErrorInfo());
          return;
        case WARN:
          log.warn("Encountered an illegal document error when executing batch {} of {}"
                  + " records. Ignoring and will not index record. Error was {}",
              batchId, batch.size(), bulkRsp.getErrorInfo());
          return;
        case FAIL:
          log.error("Encountered an illegal document error when executing batch {} of {}"
                  + " records. Error was {} (to ignore future records like this"
                  + " change the configuration property '%s' from '%s' to '%s').",
              batchId, batch.size(), bulkRsp.getErrorInfo(),
              ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG,
              BehaviorOnMalformedDoc.FAIL,
              BehaviorOnMalformedDoc.IGNORE);
          throw new ConnectException("Bulk request failed: " + bulkRsp.getErrorInfo());
        default:
          throw new RuntimeException(String.format(
              "Unknown value for %s enum: %s",
              BehaviorOnMalformedDoc.class.getSimpleName(),
              behaviorOnMalformedDoc
          ));
      }
    }
  }

  private boolean responseContainsMalformedDocError(BulkResponse bulkRsp) {
    return bulkRsp.getErrorInfo().contains("mapper_parsing_exception")
        || bulkRsp.getErrorInfo().contains("illegal_argument_exception");
  }

  private synchronized void onBatchCompletion(int batchSize) {
@@ -419,4 +468,51 @@ public class BulkProcessor<R, B> {
    }
  }

  public enum BehaviorOnMalformedDoc {
    IGNORE,
    WARN,
    FAIL;

    public static final BehaviorOnMalformedDoc DEFAULT = FAIL;

    // Want values for "behavior.on.malformed.doc" property to be case-insensitive
    public static final ConfigDef.Validator VALIDATOR = new ConfigDef.Validator() {
      private final ConfigDef.ValidString validator = ConfigDef.ValidString.in(names());

      @Override
      public void ensureValid(String name, Object value) {
        if (value instanceof String) {
          value = ((String) value).toLowerCase(Locale.ROOT);
        }
        validator.ensureValid(name, value);
      }

      // Overridden here so that ConfigDef.toEnrichedRst shows possible values correctly
      @Override
      public String toString() {
        return validator.toString();
      }

    };

    public static String[] names() {
      BehaviorOnMalformedDoc[] behaviors = values();
      String[] result = new String[behaviors.length];

      for (int i = 0; i < behaviors.length; i++) {
        result[i] = behaviors[i].toString();
      }

      return result;
    }

    public static BehaviorOnMalformedDoc forValue(String value) {
      return valueOf(value.toUpperCase(Locale.ROOT));
    }

    @Override
    public String toString() {
      return name().toLowerCase(Locale.ROOT);
    }
  }
}
Loading