Commit e22337bd authored by Liz Bennett's avatar Liz Bennett Committed by Konstantine Karantasis
Browse files

Implement feedback from code review

parent 904dc48e
Loading
Loading
Loading
Loading
+8 −0
Original line number Diff line number Diff line
@@ -143,3 +143,11 @@ Data Conversion
  * Default: ignore
  * Valid Values: [ignore, delete, fail]
  * Importance: low

``behavior.on.malformed.documents``
  How to handle records that Elasticsearch rejects due to some malformation of the document itself, such as an index mapping conflict or a field name containing illegal characters. Valid options are 'ignore', 'warn', and 'fail'.

  * Type: string
  * Default: fail
  * Valid Values: [ignore, warn, fail]
  * Importance: low
 No newline at end of file
+12 −8
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigDef.Width;
import java.util.Map;

import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues;
import static io.confluent.connect.elasticsearch.bulk.BulkProcessor.BehaviorOnMalformedDoc;

public class ElasticsearchSinkConnectorConfig extends AbstractConfig {

@@ -132,9 +133,11 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
      + "non-null key and a null value (i.e. Kafka tombstone records). Valid options are "
      + "'ignore', 'delete', and 'fail'.";

  public static final String IGNORE_MAPPING_ERRORS_CONFIG = "ignore.mapping.errors";
  private static final String IGNORE_MAPPING_ERRORS_DOC = "Whether to ignore "
      + "mapper_parsing_exceptions thrown by Elasticsearch upon indexing malformed documents.";
  public static final String BEHAVIOR_ON_MALFORMED_DOCS_CONFIG = "behavior.on.malformed.documents";
  private static final String BEHAVIOR_ON_MALFORMED_DOCS_DOC = "How to handle records that "
      + "Elasticsearch rejects due to some malformation of the document itself, such as an index"
      + " mapping conflict or a field name containing illegal characters. Valid options are "
      + "'ignore', 'warn', and 'fail'.";

  protected static ConfigDef baseConfigDef() {
    final ConfigDef configDef = new ConfigDef();
@@ -341,15 +344,16 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
        Width.SHORT,
        "Behavior for null-valued records"
    ).define(
        IGNORE_MAPPING_ERRORS_CONFIG,
        Type.BOOLEAN,
        false,
        BEHAVIOR_ON_MALFORMED_DOCS_CONFIG,
        Type.STRING,
        BehaviorOnMalformedDoc.DEFAULT.toString(),
        BehaviorOnMalformedDoc.VALIDATOR,
        Importance.LOW,
        IGNORE_MAPPING_ERRORS_DOC,
        BEHAVIOR_ON_MALFORMED_DOCS_DOC,
        group,
        ++order,
        Width.SHORT,
        "Ignore Mapping Errors mode");
        "Behavior on malformed documents");
  }

  public static final ConfigDef CONFIG = baseConfigDef();
+6 −3
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@

package io.confluent.connect.elasticsearch;

import io.confluent.connect.elasticsearch.bulk.BulkProcessor;
import io.confluent.connect.elasticsearch.jest.JestElasticsearchClient;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
@@ -95,8 +96,10 @@ public class ElasticsearchSinkTask extends SinkTask {
              config.getString(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG)
          );

      boolean ignoreMappingErrors = config.getBoolean(
          ElasticsearchSinkConnectorConfig.IGNORE_MAPPING_ERRORS_CONFIG);
      BulkProcessor.BehaviorOnMalformedDoc behaviorOnMalformedDoc =
          BulkProcessor.BehaviorOnMalformedDoc.forValue(
              config.getString(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG)
          );

      // Calculate the maximum possible backoff time ...
      long maxRetryBackoffMs =
@@ -131,7 +134,7 @@ public class ElasticsearchSinkTask extends SinkTask {
          .setMaxRetry(maxRetry)
          .setDropInvalidMessage(dropInvalidMessage)
          .setBehaviorOnNullValues(behaviorOnNullValues)
          .setIgnoreMappingErrors(ignoreMappingErrors);
          .setBehaviorOnMalformedDoc(behaviorOnMalformedDoc);

      writer = builder.build();
      writer.start();
+9 −8
Original line number Diff line number Diff line
@@ -33,6 +33,7 @@ import java.util.Objects;
import java.util.Set;

import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues;
import static io.confluent.connect.elasticsearch.bulk.BulkProcessor.BehaviorOnMalformedDoc;

public class ElasticsearchWriter {
  private static final Logger log = LoggerFactory.getLogger(ElasticsearchWriter.class);
@@ -52,7 +53,7 @@ public class ElasticsearchWriter {
  private final DataConverter converter;

  private final Set<String> existingMappings;
  private final boolean ignoreMappingErrors;
  private final BehaviorOnMalformedDoc behaviorOnMalformedDoc;

  ElasticsearchWriter(
      ElasticsearchClient client,
@@ -72,7 +73,7 @@ public class ElasticsearchWriter {
      long retryBackoffMs,
      boolean dropInvalidMessage,
      BehaviorOnNullValues behaviorOnNullValues,
      boolean ignoreMappingErrors
      BehaviorOnMalformedDoc behaviorOnMalformedDoc
  ) {
    this.client = client;
    this.type = type;
@@ -85,7 +86,7 @@ public class ElasticsearchWriter {
    this.dropInvalidMessage = dropInvalidMessage;
    this.behaviorOnNullValues = behaviorOnNullValues;
    this.converter = new DataConverter(useCompactMapEntries, behaviorOnNullValues);
    this.ignoreMappingErrors = ignoreMappingErrors;
    this.behaviorOnMalformedDoc = behaviorOnMalformedDoc;

    bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
@@ -96,7 +97,7 @@ public class ElasticsearchWriter {
        lingerMs,
        maxRetries,
        retryBackoffMs,
        ignoreMappingErrors
        behaviorOnMalformedDoc
    );

    existingMappings = new HashSet<>();
@@ -120,7 +121,7 @@ public class ElasticsearchWriter {
    private long retryBackoffMs;
    private boolean dropInvalidMessage;
    private BehaviorOnNullValues behaviorOnNullValues = BehaviorOnNullValues.DEFAULT;
    private boolean ignoreMappingErrors;
    private BehaviorOnMalformedDoc behaviorOnMalformedDoc;

    public Builder(ElasticsearchClient client) {
      this.client = client;
@@ -205,8 +206,8 @@ public class ElasticsearchWriter {
      return this;
    }

    public Builder setIgnoreMappingErrors(boolean ignoreMappingErrors) {
      this.ignoreMappingErrors = ignoreMappingErrors;
    public Builder setBehaviorOnMalformedDoc(BehaviorOnMalformedDoc behaviorOnMalformedDoc) {
      this.behaviorOnMalformedDoc = behaviorOnMalformedDoc;
      return this;
    }

@@ -229,7 +230,7 @@ public class ElasticsearchWriter {
          retryBackoffMs,
          dropInvalidMessage,
          behaviorOnNullValues,
          ignoreMappingErrors
          behaviorOnMalformedDoc
      );
    }
  }
+88 −12
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@
package io.confluent.connect.elasticsearch.bulk;

import io.confluent.connect.elasticsearch.RetryUtil;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
@@ -26,6 +27,7 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -53,7 +55,7 @@ public class BulkProcessor<R, B> {
  private final long lingerMs;
  private final int maxRetries;
  private final long retryBackoffMs;
  private final boolean ignoreMappingErrors;
  private final BehaviorOnMalformedDoc behaviorOnMalformedDoc;

  private final Thread farmer;
  private final ExecutorService executor;
@@ -78,7 +80,7 @@ public class BulkProcessor<R, B> {
      long lingerMs,
      int maxRetries,
      long retryBackoffMs,
      boolean ignoreMappingErrors
      BehaviorOnMalformedDoc behaviorOnMalformedDoc
  ) {
    this.time = time;
    this.bulkClient = bulkClient;
@@ -87,7 +89,7 @@ public class BulkProcessor<R, B> {
    this.lingerMs = lingerMs;
    this.maxRetries = maxRetries;
    this.retryBackoffMs = retryBackoffMs;
    this.ignoreMappingErrors = ignoreMappingErrors;
    this.behaviorOnMalformedDoc = behaviorOnMalformedDoc;

    unsentRecords = new ArrayDeque<>(maxBufferedRecords);

@@ -375,17 +377,17 @@ public class BulkProcessor<R, B> {
                      batchId, batch.size(), attempts, maxAttempts);
            }
            return bulkRsp;
          }
          if (ignoreMappingErrors) {
            if (responseContainsMappingError(bulkRsp)) {
              log.info("Encountered mapper_parsing_exception when executing batch {} of {} records."
                      + " Ignoring. Error was {}",
                  batchId, batch.size(), bulkRsp.getErrorInfo());
          } else {
            if (responseContainsMalformedDocError(bulkRsp)) {
              retriable = bulkRsp.isRetriable();
              handleMalformedDoc(bulkRsp);
              return bulkRsp;
            }
          }
            } else {
              // for all other errors, throw the error up
              retriable = bulkRsp.isRetriable();
              throw new ConnectException("Bulk request failed: " + bulkRsp.getErrorInfo());
            }
          }
        } catch (Exception e) {
          if (retriable && attempts < maxAttempts) {
            long sleepTimeMs = RetryUtil.computeRandomRetryWaitTimeInMillis(retryAttempts,
@@ -402,9 +404,36 @@ public class BulkProcessor<R, B> {
        }
      }
    }

    private void handleMalformedDoc(BulkResponse bulkRsp) {
      // if the elasticsearch request failed because of a malformed document,
      // the behavior is configurable.
      switch (behaviorOnMalformedDoc) {
        case WARN:
          log.warn("Encountered mapper_parsing_exception when executing batch {} of {}"
                  + " records. Ignoring. Error was {}",
              batchId, batch.size(), bulkRsp.getErrorInfo());
          return;
        case IGNORE:
          if (log.isDebugEnabled()) {
            log.debug("Encountered mapper_parsing_exception when executing batch {} of {}"
                    + " records. Ignoring. Error was {}",
                batchId, batch.size(), bulkRsp.getErrorInfo());
          }
          return;
        case FAIL:
          throw new ConnectException("Bulk request failed: " + bulkRsp.getErrorInfo());
        default:
          throw new RuntimeException(String.format(
              "Unknown value for %s enum: %s",
              BehaviorOnMalformedDoc.class.getSimpleName(),
              behaviorOnMalformedDoc
          ));
      }
    }
  }

  private boolean responseContainsMappingError(BulkResponse bulkRsp) {
  private boolean responseContainsMalformedDocError(BulkResponse bulkRsp) {
    return bulkRsp.getErrorInfo().contains("mapper_parsing_exception")
        || bulkRsp.getErrorInfo().contains("illegal_argument_exception");
  }
@@ -435,4 +464,51 @@ public class BulkProcessor<R, B> {
    }
  }

  public enum BehaviorOnMalformedDoc {
    IGNORE,
    WARN,
    FAIL;

    public static final BehaviorOnMalformedDoc DEFAULT = FAIL;

    // Want values for "behavior.on.null.values" property to be case-insensitive
    public static final ConfigDef.Validator VALIDATOR = new ConfigDef.Validator() {
      private final ConfigDef.ValidString validator = ConfigDef.ValidString.in(names());

      @Override
      public void ensureValid(String name, Object value) {
        if (value instanceof String) {
          value = ((String) value).toLowerCase(Locale.ROOT);
        }
        validator.ensureValid(name, value);
      }

      // Overridden here so that ConfigDef.toEnrichedRst shows possible values correctly
      @Override
      public String toString() {
        return validator.toString();
      }

    };

    public static String[] names() {
      BehaviorOnMalformedDoc[] behaviors = values();
      String[] result = new String[behaviors.length];

      for (int i = 0; i < behaviors.length; i++) {
        result[i] = behaviors[i].toString();
      }

      return result;
    }

    public static BehaviorOnMalformedDoc forValue(String value) {
      return valueOf(value.toUpperCase(Locale.ROOT));
    }

    @Override
    public String toString() {
      return name().toLowerCase(Locale.ROOT);
    }
  }
}
Loading