Commit dc56e42d authored by Liz Bennett's avatar Liz Bennett Committed by Konstantine Karantasis
Browse files

Fixes from code review of behavior on malformed doc config feature

parent 539bad9b
Loading
Loading
Loading
Loading
+14 −18
Original line number Diff line number Diff line
@@ -378,8 +378,7 @@ public class BulkProcessor<R, B> {
                      batchId, batch.size(), attempts, maxAttempts);
            }
            return bulkRsp;
          } else {
            if (responseContainsMalformedDocError(bulkRsp)) {
          } else if (responseContainsMalformedDocError(bulkRsp)) {
            retriable = bulkRsp.isRetriable();
            handleMalformedDoc(bulkRsp);
            return bulkRsp;
@@ -388,7 +387,6 @@ public class BulkProcessor<R, B> {
            retriable = bulkRsp.isRetriable();
            throw new ConnectException("Bulk request failed: " + bulkRsp.getErrorInfo());
          }
          }
        } catch (Exception e) {
          if (retriable && attempts < maxAttempts) {
            long sleepTimeMs = RetryUtil.computeRandomRetryWaitTimeInMillis(retryAttempts,
@@ -410,17 +408,15 @@ public class BulkProcessor<R, B> {
      // if the elasticsearch request failed because of a malformed document,
      // the behavior is configurable.
      switch (behaviorOnMalformedDoc) {
        case WARN:
          log.warn("Encountered an illegal document error when executing batch {} of {}"
                  + " records. Error was {}. Will not index record.",
              batchId, batch.size(), bulkRsp.getErrorInfo());
          return;
        case IGNORE:
          if (log.isDebugEnabled()) {
          log.debug("Encountered an illegal document error when executing batch {} of {}"
                    + " records. Ignoring. Error was {}",
                  + " records. Ignoring and will not index record. Error was {}",
              batchId, batch.size(), bulkRsp.getErrorInfo());
          return;
        case WARN:
          log.warn("Encountered an illegal document error when executing batch {} of {}"
                  + " records. Ignoring and will not index record. Error was {}",
              batchId, batch.size(), bulkRsp.getErrorInfo());
          }
          return;
        case FAIL:
          log.error("Encountered an illegal document error when executing batch {} of {}"
@@ -479,7 +475,7 @@ public class BulkProcessor<R, B> {

    public static final BehaviorOnMalformedDoc DEFAULT = FAIL;

    // Want values for "behavior.on.null.values" property to be case-insensitive
    // Want values for "behavior.on.malformed.doc" property to be case-insensitive
    public static final ConfigDef.Validator VALIDATOR = new ConfigDef.Validator() {
      private final ConfigDef.ValidString validator = ConfigDef.ValidString.in(names());

+46 −35
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@
package io.confluent.connect.elasticsearch.bulk;

import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.junit.After;
import org.junit.Before;
@@ -24,6 +25,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
@@ -107,7 +109,7 @@ public class BulkProcessorTest {
    final BehaviorOnMalformedDoc behaviorOnMalformedDoc = BehaviorOnMalformedDoc.DEFAULT;

    final BulkProcessor<Integer, ?> bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
        Time.SYSTEM,
        client,
        maxBufferedRecords,
        maxInFlightBatches,
@@ -151,7 +153,7 @@ public class BulkProcessorTest {
    final BehaviorOnMalformedDoc behaviorOnMalformedDoc = BehaviorOnMalformedDoc.DEFAULT;

    final BulkProcessor<Integer, ?> bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
        Time.SYSTEM,
        client,
        maxBufferedRecords,
        maxInFlightBatches,
@@ -188,7 +190,7 @@ public class BulkProcessorTest {
    final BehaviorOnMalformedDoc behaviorOnMalformedDoc = BehaviorOnMalformedDoc.DEFAULT;

    final BulkProcessor<Integer, ?> bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
        Time.SYSTEM,
        client,
        maxBufferedRecords,
        maxInFlightBatches,
@@ -225,7 +227,7 @@ public class BulkProcessorTest {
    client.expect(Arrays.asList(42, 43), BulkResponse.success());

    final BulkProcessor<Integer, ?> bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
        Time.SYSTEM,
        client,
        maxBufferedRecords,
        maxInFlightBatches,
@@ -259,7 +261,7 @@ public class BulkProcessorTest {
    client.expect(Arrays.asList(42, 43), BulkResponse.failure(true, errorInfo));

    final BulkProcessor<Integer, ?> bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
        Time.SYSTEM,
        client,
        maxBufferedRecords,
        maxInFlightBatches,
@@ -296,7 +298,7 @@ public class BulkProcessorTest {
    client.expect(Arrays.asList(42, 43), BulkResponse.failure(false, errorInfo));

    final BulkProcessor<Integer, ?> bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
        Time.SYSTEM,
        client,
        maxBufferedRecords,
        maxInFlightBatches,
@@ -335,7 +337,7 @@ public class BulkProcessorTest {
    client.expect(Arrays.asList(42, 43), BulkResponse.failure(false, errorInfo));

    final BulkProcessor<Integer, ?> bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
        Time.SYSTEM,
        client,
        maxBufferedRecords,
        maxInFlightBatches,
@@ -361,22 +363,31 @@ public class BulkProcessorTest {
  }

  @Test
  public void ignoreOnMalformedDoc() throws InterruptedException {
  public void ignoreOrWarnOnMalformedDoc() throws InterruptedException {
    final int maxBufferedRecords = 100;
    final int maxInFlightBatches = 5;
    final int batchSize = 2;
    final int lingerMs = 5;
    final int maxRetries = 3;
    final int retryBackoffMs = 1;
    final BehaviorOnMalformedDoc behaviorOnMalformedDoc = BehaviorOnMalformedDoc.IGNORE;

    // Test both IGNORE and WARN options
    // There is no difference in logic between IGNORE and WARN, except for the logging.
    // Test to ensure they both work the same logically
    final List<BehaviorOnMalformedDoc> behaviorsToTest = new ArrayList<BehaviorOnMalformedDoc>() {{
      add(BehaviorOnMalformedDoc.WARN);
      add(BehaviorOnMalformedDoc.IGNORE);
    }};

    for(BehaviorOnMalformedDoc behaviorOnMalformedDoc : behaviorsToTest)
    {
      final String errorInfo = " [{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse\"," +
          "\"caused_by\":{\"type\":\"illegal_argument_exception\",\"reason\":\"object\n" +
          " field starting or ending with a [.] makes object resolution ambiguous: [avjpz{{.}}wjzse{{..}}gal9d]\"}}]";
      client.expect(Arrays.asList(42, 43), BulkResponse.failure(false, errorInfo));

      final BulkProcessor<Integer, ?> bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
          Time.SYSTEM,
          client,
          maxBufferedRecords,
          maxInFlightBatches,
@@ -392,7 +403,6 @@ public class BulkProcessorTest {
      bulkProcessor.add(42, 1);
      bulkProcessor.add(43, 1);


      try {
        bulkProcessor.flush(10);
      } catch (ConnectException e) {
@@ -400,3 +410,4 @@ public class BulkProcessorTest {
      }
    }
  }
}