Unverified Commit 5d401fee authored by Chris Egerton's avatar Chris Egerton Committed by GitHub
Browse files

Merge pull request #165 from rhauch/cc-350-1097

CC-350, CC-1097: Added support for delete and null handling
parents 73bb7ba0 8d60c146
Loading
Loading
Loading
Loading
+8 −0
Original line number Diff line number Diff line
@@ -129,3 +129,11 @@ Data Conversion
  * Type: list
  * Default: ""
  * Importance: low

``behavior.on.null.values``
  How to handle records with a non-null key and a null value (i.e. Kafka tombstone records). Valid options are 'ignore', 'delete', and 'fail'.

  * Type: string
  * Default: ignore
  * Valid Values: [ignore, delete, fail]
  * Importance: low
 No newline at end of file
+1 −1
Original line number Diff line number Diff line
@@ -47,7 +47,7 @@ public class BulkIndexingClient implements BulkClient<IndexableRecord, Bulk> {
  public Bulk bulkRequest(List<IndexableRecord> batch) {
    final Bulk.Builder builder = new Bulk.Builder();
    for (IndexableRecord record : batch) {
      builder.addAction(record.toIndexRequest());
      builder.addAction(record.toBulkableAction());
    }
    return builder.build();
  }
+117 −9
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@

package io.confluent.connect.elasticsearch;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
@@ -38,7 +39,9 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_VALUE;
@@ -52,9 +55,23 @@ public class DataConverter {
  }

  private final boolean useCompactMapEntries;
  private final BehaviorOnNullValues behaviorOnNullValues;

  public DataConverter(boolean useCompactMapEntries) {
  /**
   * Create a DataConverter, specifying how map entries with string keys within record
   * values should be written to JSON. Compact map entries are written as
   * <code>"entryKey": "entryValue"</code>, while the non-compact form are written as a nested
   * document such as <code>{"key": "entryKey", "value": "entryValue"}</code>. All map entries
   * with non-string keys are always written as nested documents.
   *
   * @param useCompactMapEntries true for compact map entries with string keys, or false for
   *                             the nested document form.
   * @param behaviorOnNullValues behavior for handling records with null values; may not be null
   */
  public DataConverter(boolean useCompactMapEntries, BehaviorOnNullValues behaviorOnNullValues) {
    this.useCompactMapEntries = useCompactMapEntries;
    this.behaviorOnNullValues =
        Objects.requireNonNull(behaviorOnNullValues, "behaviorOnNullValues cannot be null.");
  }

  private String convertKey(Schema keySchema, Object key) {
@@ -85,6 +102,45 @@ public class DataConverter {
  }

  public IndexableRecord convertRecord(SinkRecord record, String index, String type, boolean ignoreKey, boolean ignoreSchema) {
    if (record.value() == null) {
      switch (behaviorOnNullValues) {
        case IGNORE:
          return null;
        case DELETE:
          if (record.key() == null) {
            // Since the record key is used as the ID of the index to delete and we don't have a key
            // for this record, we can't delete anything anyways, so we ignore the record.
            // We can also disregard the value of the ignoreKey parameter, since even if it's true
            // the resulting index we'd try to delete would be based solely off topic/partition/
            // offset information for the SinkRecord. Since that information is guaranteed to be
            // unique per message, we can be confident that there wouldn't be any corresponding
            // index present in ES to delete anyways.
            return null;
          }
          // Will proceed as normal, ultimately creating an IndexableRecord with a null payload
          break;
        case FAIL:
          throw new DataException(String.format(
              "Sink record with key of %s and null value encountered for topic/partition/offset "
              + "%s/%s/%s (to ignore future records like this change the configuration property "
              + "'%s' from '%s' to '%s')",
              record.key(),
              record.topic(),
              record.kafkaPartition(),
              record.kafkaOffset(),
              ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG,
              BehaviorOnNullValues.FAIL,
              BehaviorOnNullValues.IGNORE
          ));
        default:
          throw new RuntimeException(String.format(
              "Unknown value for %s enum: %s",
              BehaviorOnNullValues.class.getSimpleName(),
              behaviorOnNullValues
          ));
      }
    }

    final String id;
    if (ignoreKey) {
      id = record.topic() + "+" + String.valueOf((int) record.kafkaPartition()) + "+" + String.valueOf(record.kafkaOffset());
@@ -92,17 +148,22 @@ public class DataConverter {
      id = convertKey(record.keySchema(), record.key());
    }

    final Schema schema;
    final Object value;
    if (!ignoreSchema) {
      schema = preProcessSchema(record.valueSchema());
      value = preProcessValue(record.value(), record.valueSchema(), schema);
    final String payload;
    if (record.value() != null) {
      Schema schema = ignoreSchema
          ? record.valueSchema()
          : preProcessSchema(record.valueSchema());

      Object value = ignoreSchema
          ? record.value()
          : preProcessValue(record.value(), record.valueSchema(), schema);

      byte[] rawJsonPayload = JSON_CONVERTER.fromConnectData(record.topic(), schema, value);
      payload = new String(rawJsonPayload, StandardCharsets.UTF_8);
    } else {
      schema = record.valueSchema();
      value = record.value();
      payload = null;
    }

    final String payload = new String(JSON_CONVERTER.fromConnectData(record.topic(), schema, value), StandardCharsets.UTF_8);
    final Long version = ignoreKey ? null : record.kafkaOffset();
    return new IndexableRecord(new Key(index, type, id), payload, version);
  }
@@ -242,4 +303,51 @@ public class DataConverter {
        return value;
    }
  }
  public enum BehaviorOnNullValues {
    IGNORE,
    DELETE,
    FAIL;

    public static final BehaviorOnNullValues DEFAULT = IGNORE;

    // Want values for "behavior.on.null.values" property to be case-insensitive
    public static final ConfigDef.Validator VALIDATOR = new ConfigDef.Validator() {
      private final ConfigDef.ValidString validator = ConfigDef.ValidString.in(names());

      @Override
      public void ensureValid(String name, Object value) {
        if (value instanceof String) {
          value = ((String) value).toLowerCase(Locale.ROOT);
        }
        validator.ensureValid(name, value);
      }

      // Overridden here so that ConfigDef.toEnrichedRst shows possible values correctly
      @Override
      public String toString() {
        return validator.toString();
      }

    };

    public static String[] names() {
      BehaviorOnNullValues[] behaviors = values();
      String[] result = new String[behaviors.length];

      for (int i = 0; i < behaviors.length; i++) {
        result[i] = behaviors[i].toString();
      }

      return result;
    }

    public static BehaviorOnNullValues forValue(String value) {
      return valueOf(value.toUpperCase(Locale.ROOT));
    }

    @Override
    public String toString() {
      return name().toLowerCase(Locale.ROOT);
    }
  }
}
+12 −2
Original line number Diff line number Diff line
@@ -24,6 +24,8 @@ import org.apache.kafka.common.config.ConfigDef.Width;

import java.util.Map;

import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues;

public class ElasticsearchSinkConnectorConfig extends AbstractConfig {

  public static final String CONNECTION_URL_CONFIG = "connection.url";
@@ -43,6 +45,7 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
  public static final String COMPACT_MAP_ENTRIES_CONFIG = "compact.map.entries";
  public static final String CONNECTION_TIMEOUT_MS_CONFIG = "connection.timeout.ms";
  public static final String READ_TIMEOUT_MS_CONFIG = "read.timeout.ms";
  public static final String BEHAVIOR_ON_NULL_VALUES_CONFIG = "behavior.on.null.values";

  protected static ConfigDef baseConfigDef() {
    final ConfigDef configDef = new ConfigDef();
@@ -82,7 +85,8 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
                  group, ++order, Width.SHORT, "Max Retries")
          .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, Importance.LOW,
                  "How long to wait in milliseconds before attempting the first retry of a failed indexing request. "
                  + "Upon a failure, this connector may wait up to twice as long as the previous wait, up to the maximum number of retries. "
                  + "This connector uses exponential backoff with jitter, which means that upon "
                  + "additional failures, this connector may wait up to twice as long as the previous wait, up to the maximum number of retries. "
                  + "This avoids retrying in a tight loop under failure scenarios.",
                  group, ++order, Width.SHORT, "Retry Backoff (ms)")
          .define(CONNECTION_TIMEOUT_MS_CONFIG, Type.INT, 1000, Importance.LOW, "How long to wait "
@@ -132,7 +136,13 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
                  group, ++order, Width.LONG, "Topics for 'Ignore Key' mode")
          .define(TOPIC_SCHEMA_IGNORE_CONFIG, Type.LIST, "", Importance.LOW,
                  "List of topics for which ``" + SCHEMA_IGNORE_CONFIG + "`` should be ``true``.",
                  group, ++order, Width.LONG, "Topics for 'Ignore Schema' mode");
                  group, ++order, Width.LONG, "Topics for 'Ignore Schema' mode")
          .define(BEHAVIOR_ON_NULL_VALUES_CONFIG, Type.STRING,
                  BehaviorOnNullValues.DEFAULT.toString(), BehaviorOnNullValues.VALIDATOR,
                  Importance.LOW,
                  "How to handle records with a non-null key and a null value (i.e. Kafka tombstone "
                  + "records). Valid options are 'ignore', 'delete', and 'fail'.",
                  group, ++order, Width.SHORT, "Behavior for null-valued records");
    }

    return configDef;
+9 −1
Original line number Diff line number Diff line
@@ -37,6 +37,8 @@ import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;

import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues;

public class ElasticsearchSinkTask extends SinkTask {

  private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class);
@@ -78,6 +80,11 @@ public class ElasticsearchSinkTask extends SinkTask {
      int connTimeout = config.getInt(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG);
      int readTimeout = config.getInt(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG);

      BehaviorOnNullValues behaviorOnNullValues =
          BehaviorOnNullValues.forValue(
              config.getString(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG)
          );

      // Calculate the maximum possible backoff time ...
      long maxRetryBackoffMs = RetryUtil.computeRetryWaitTimeInMillis(maxRetry, retryBackoffMs);
      if (maxRetryBackoffMs > RetryUtil.MAX_RETRY_TIME_MS) {
@@ -114,7 +121,8 @@ public class ElasticsearchSinkTask extends SinkTask {
          .setBatchSize(batchSize)
          .setLingerMs(lingerMs)
          .setRetryBackoffMs(retryBackoffMs)
          .setMaxRetry(maxRetry);
          .setMaxRetry(maxRetry)
          .setBehaviorOnNullValues(behaviorOnNullValues);

      writer = builder.build();
      writer.start();
Loading