Commit cb2b4af6 authored by Liz Bennett's avatar Liz Bennett Committed by Konstantine Karantasis
Browse files

Add config that allows Elasticsearch mapper parsing errors to be ignored

parent 29d6b201
Loading
Loading
Loading
Loading
+14 −1
Original line number Diff line number Diff line
@@ -132,6 +132,9 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
      + "non-null key and a null value (i.e. Kafka tombstone records). Valid options are "
      + "'ignore', 'delete', and 'fail'.";

  public static final String IGNORE_MAPPING_ERRORS_CONFIG = "ignore.mapping.errors";
  private static final String IGNORE_MAPPING_ERRORS_DOC = "Whether to ignore "
      + "mapper_parsing_exceptions thrown by Elasticsearch upon indexing malformed documents.";

  protected static ConfigDef baseConfigDef() {
    final ConfigDef configDef = new ConfigDef();
@@ -336,7 +339,17 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
        group,
        ++order,
        Width.SHORT,
        "Behavior for null-valued records");
        "Behavior for null-valued records"
    ).define(
        IGNORE_MAPPING_ERRORS_CONFIG,
        Type.BOOLEAN,
        false,
        Importance.LOW,
        IGNORE_MAPPING_ERRORS_DOC,
        group,
        ++order,
        Width.SHORT,
        "Ignore Mapping Errors mode");
  }

  public static final ConfigDef CONFIG = baseConfigDef();
+5 −1
Original line number Diff line number Diff line
@@ -95,6 +95,9 @@ public class ElasticsearchSinkTask extends SinkTask {
              config.getString(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG)
          );

      boolean ignoreMappingErrors = config.getBoolean(
          ElasticsearchSinkConnectorConfig.IGNORE_MAPPING_ERRORS_CONFIG);

      // Calculate the maximum possible backoff time ...
      long maxRetryBackoffMs =
          RetryUtil.computeRetryWaitTimeInMillis(maxRetry, retryBackoffMs);
@@ -127,7 +130,8 @@ public class ElasticsearchSinkTask extends SinkTask {
          .setRetryBackoffMs(retryBackoffMs)
          .setMaxRetry(maxRetry)
          .setDropInvalidMessage(dropInvalidMessage)
          .setBehaviorOnNullValues(behaviorOnNullValues);
          .setBehaviorOnNullValues(behaviorOnNullValues)
          .setIgnoreMappingErrors(ignoreMappingErrors);

      writer = builder.build();
      writer.start();
+14 −3
Original line number Diff line number Diff line
@@ -52,6 +52,7 @@ public class ElasticsearchWriter {
  private final DataConverter converter;

  private final Set<String> existingMappings;
  private final boolean ignoreMappingErrors;

  ElasticsearchWriter(
      ElasticsearchClient client,
@@ -70,7 +71,8 @@ public class ElasticsearchWriter {
      int maxRetries,
      long retryBackoffMs,
      boolean dropInvalidMessage,
      BehaviorOnNullValues behaviorOnNullValues
      BehaviorOnNullValues behaviorOnNullValues,
      boolean ignoreMappingErrors
  ) {
    this.client = client;
    this.type = type;
@@ -83,6 +85,7 @@ public class ElasticsearchWriter {
    this.dropInvalidMessage = dropInvalidMessage;
    this.behaviorOnNullValues = behaviorOnNullValues;
    this.converter = new DataConverter(useCompactMapEntries, behaviorOnNullValues);
    this.ignoreMappingErrors = ignoreMappingErrors;

    bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
@@ -92,7 +95,8 @@ public class ElasticsearchWriter {
        batchSize,
        lingerMs,
        maxRetries,
        retryBackoffMs
        retryBackoffMs,
        ignoreMappingErrors
    );

    existingMappings = new HashSet<>();
@@ -116,6 +120,7 @@ public class ElasticsearchWriter {
    private long retryBackoffMs;
    private boolean dropInvalidMessage;
    private BehaviorOnNullValues behaviorOnNullValues = BehaviorOnNullValues.DEFAULT;
    private boolean ignoreMappingErrors;

    public Builder(ElasticsearchClient client) {
      this.client = client;
@@ -200,6 +205,11 @@ public class ElasticsearchWriter {
      return this;
    }

    public Builder setIgnoreMappingErrors(boolean ignoreMappingErrors) {
      this.ignoreMappingErrors = ignoreMappingErrors;
      return this;
    }

    public ElasticsearchWriter build() {
      return new ElasticsearchWriter(
          client,
@@ -218,7 +228,8 @@ public class ElasticsearchWriter {
          maxRetry,
          retryBackoffMs,
          dropInvalidMessage,
          behaviorOnNullValues
          behaviorOnNullValues,
          ignoreMappingErrors
      );
    }
  }
+10 −1
Original line number Diff line number Diff line
@@ -53,6 +53,7 @@ public class BulkProcessor<R, B> {
  private final long lingerMs;
  private final int maxRetries;
  private final long retryBackoffMs;
  private final boolean ignoreMappingErrors;

  private final Thread farmer;
  private final ExecutorService executor;
@@ -76,7 +77,8 @@ public class BulkProcessor<R, B> {
      int batchSize,
      long lingerMs,
      int maxRetries,
      long retryBackoffMs
      long retryBackoffMs,
      boolean ignoreMappingErrors
  ) {
    this.time = time;
    this.bulkClient = bulkClient;
@@ -85,6 +87,7 @@ public class BulkProcessor<R, B> {
    this.lingerMs = lingerMs;
    this.maxRetries = maxRetries;
    this.retryBackoffMs = retryBackoffMs;
    this.ignoreMappingErrors = ignoreMappingErrors;

    unsentRecords = new ArrayDeque<>(maxBufferedRecords);

@@ -373,6 +376,12 @@ public class BulkProcessor<R, B> {
            }
            return bulkRsp;
          }
          if (ignoreMappingErrors && bulkRsp.getErrorInfo().contains("mapper_parsing_exception")) {
            log.info("Encountered mapper_parsing_exception when execute batch {} of {} records."
                    + " Ignoring. {}",
                batchId, batch.size(), bulkRsp.getErrorInfo());
            return bulkRsp;
          }
          retriable = bulkRsp.isRetriable();
          throw new ConnectException("Bulk request failed: " + bulkRsp.getErrorInfo());
        } catch (Exception e) {
+97 −6
Original line number Diff line number Diff line
@@ -102,6 +102,7 @@ public class BulkProcessorTest {
    final int lingerMs = 5;
    final int maxRetries = 0;
    final int retryBackoffMs = 0;
    final boolean ignoreMappingErrors = false;

    final BulkProcessor<Integer, ?> bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
@@ -111,7 +112,8 @@ public class BulkProcessorTest {
        batchSize,
        lingerMs,
        maxRetries,
        retryBackoffMs
        retryBackoffMs,
        ignoreMappingErrors
    );

    final int addTimeoutMs = 10;
@@ -144,6 +146,7 @@ public class BulkProcessorTest {
    final int lingerMs = 100000; // super high on purpose to make sure flush is what's causing the request
    final int maxRetries = 0;
    final int retryBackoffMs = 0;
    final boolean ignoreMappingErrors = false;

    final BulkProcessor<Integer, ?> bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
@@ -153,7 +156,8 @@ public class BulkProcessorTest {
        batchSize,
        lingerMs,
        maxRetries,
        retryBackoffMs
        retryBackoffMs,
        ignoreMappingErrors
    );

    client.expect(Arrays.asList(1, 2, 3), BulkResponse.success());
@@ -179,6 +183,7 @@ public class BulkProcessorTest {
    final int lingerMs = 10;
    final int maxRetries = 0;
    final int retryBackoffMs = 0;
    final boolean ignoreMappingErrors = false;

    final BulkProcessor<Integer, ?> bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
@@ -188,7 +193,8 @@ public class BulkProcessorTest {
        batchSize,
        lingerMs,
        maxRetries,
        retryBackoffMs
        retryBackoffMs,
        ignoreMappingErrors
    );

    final int addTimeoutMs = 10;
@@ -210,6 +216,7 @@ public class BulkProcessorTest {
    final int lingerMs = 5;
    final int maxRetries = 3;
    final int retryBackoffMs = 1;
    final boolean ignoreMappingErrors = false;

    client.expect(Arrays.asList(42, 43), BulkResponse.failure(true, "a retiable error"));
    client.expect(Arrays.asList(42, 43), BulkResponse.failure(true, "a retriable error again"));
@@ -223,7 +230,8 @@ public class BulkProcessorTest {
        batchSize,
        lingerMs,
        maxRetries,
        retryBackoffMs
        retryBackoffMs,
        ignoreMappingErrors
    );

    final int addTimeoutMs = 10;
@@ -255,7 +263,8 @@ public class BulkProcessorTest {
        batchSize,
        lingerMs,
        maxRetries,
        retryBackoffMs
        retryBackoffMs,
        false
    );

    final int addTimeoutMs = 10;
@@ -278,6 +287,7 @@ public class BulkProcessorTest {
    final int lingerMs = 5;
    final int maxRetries = 3;
    final int retryBackoffMs = 1;
    final boolean ignoreMappingErrors = false;

    final String errorInfo = "an unretriable error";
    client.expect(Arrays.asList(42, 43), BulkResponse.failure(false, errorInfo));
@@ -290,7 +300,8 @@ public class BulkProcessorTest {
        batchSize,
        lingerMs,
        maxRetries,
        retryBackoffMs
        retryBackoffMs,
        ignoreMappingErrors
    );

    final int addTimeoutMs = 10;
@@ -305,4 +316,84 @@ public class BulkProcessorTest {
    }
  }

  @Test
  public void ignoreMappingFalse() throws InterruptedException {
    final int maxBufferedRecords = 100;
    final int maxInFlightBatches = 5;
    final int batchSize = 2;
    final int lingerMs = 5;
    final int maxRetries = 3;
    final int retryBackoffMs = 1;
    final boolean ignoreMappingErrors = false;

    final String errorInfo = " [{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse\"," +
        "\"caused_by\":{\"type\":\"illegal_argument_exception\",\"reason\":\"object\n" +
        " field starting or ending with a [.] makes object resolution ambiguous: [avjpz{{.}}wjzse{{..}}gal9d]\"}}]";
    client.expect(Arrays.asList(42, 43), BulkResponse.failure(false, errorInfo));

    final BulkProcessor<Integer, ?> bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
        client,
        maxBufferedRecords,
        maxInFlightBatches,
        batchSize,
        lingerMs,
        maxRetries,
        retryBackoffMs,
        ignoreMappingErrors
    );

    bulkProcessor.start();

    bulkProcessor.add(42, 1);
    bulkProcessor.add(43, 1);

    try {
      bulkProcessor.flush(10);
      fail();
    } catch(ConnectException e) {
      // expected
      assertTrue(e.getMessage().contains("mapper_parsing_exception"));
    }
  }

  @Test
  public void ignoreMappingTrue() throws InterruptedException {
    final int maxBufferedRecords = 100;
    final int maxInFlightBatches = 5;
    final int batchSize = 2;
    final int lingerMs = 5;
    final int maxRetries = 3;
    final int retryBackoffMs = 1;
    final boolean ignoreMappingErrors = true;

    final String errorInfo = " [{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse\"," +
        "\"caused_by\":{\"type\":\"illegal_argument_exception\",\"reason\":\"object\n" +
        " field starting or ending with a [.] makes object resolution ambiguous: [avjpz{{.}}wjzse{{..}}gal9d]\"}}]";
    client.expect(Arrays.asList(42, 43), BulkResponse.failure(false, errorInfo));

    final BulkProcessor<Integer, ?> bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
        client,
        maxBufferedRecords,
        maxInFlightBatches,
        batchSize,
        lingerMs,
        maxRetries,
        retryBackoffMs,
        ignoreMappingErrors
    );

    bulkProcessor.start();

    bulkProcessor.add(42, 1);
    bulkProcessor.add(43, 1);


    try {
      bulkProcessor.flush(10);
    } catch(ConnectException e) {
      fail(e.getMessage());
    }
  }
}