Commit 539bad9b authored by Liz Bennett's avatar Liz Bennett Committed by Konstantine Karantasis
Browse files

Improve error messages on encountering a malformed document error

parent e22337bd
Loading
Loading
Loading
Loading
+11 −3
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@

package io.confluent.connect.elasticsearch.bulk;

import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
import io.confluent.connect.elasticsearch.RetryUtil;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.Time;
@@ -410,18 +411,25 @@ public class BulkProcessor<R, B> {
      // the behavior is configurable.
      switch (behaviorOnMalformedDoc) {
        case WARN:
          log.warn("Encountered mapper_parsing_exception when executing batch {} of {}"
                  + " records. Ignoring. Error was {}",
          log.warn("Encountered an illegal document error when executing batch {} of {}"
                  + " records. Error was {}. Will not index record.",
              batchId, batch.size(), bulkRsp.getErrorInfo());
          return;
        case IGNORE:
          if (log.isDebugEnabled()) {
            log.debug("Encountered mapper_parsing_exception when executing batch {} of {}"
            log.debug("Encountered an illegal document error when executing batch {} of {}"
                    + " records. Ignoring. Error was {}",
                batchId, batch.size(), bulkRsp.getErrorInfo());
          }
          return;
        case FAIL:
          log.error("Encountered an illegal document error when executing batch {} of {}"
                  + " records. Error was {} (to ignore future records like this"
                  + " change the configuration property '%s' from '%s' to '%s').",
              batchId, batch.size(), bulkRsp.getErrorInfo(),
              ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG,
              BehaviorOnMalformedDoc.FAIL,
              BehaviorOnMalformedDoc.IGNORE);
          throw new ConnectException("Bulk request failed: " + bulkRsp.getErrorInfo());
        default:
          throw new RuntimeException(String.format(