Commit 58e6227c authored by Chris Egerton's avatar Chris Egerton
Browse files

Merge branch '4.0.x'

parents 9234e42a e3049c93
Loading
Loading
Loading
Loading
+8 −3
Original line number Diff line number Diff line
Configuration Options
---------------------

Connector
^^^^^^^^^

@@ -138,3 +135,11 @@ Data Conversion
  * Type: boolean
  * Default: false
  * Importance: low

``behavior.on.null.values``
  How to handle records with a non-null key and a null value (i.e. Kafka tombstone records). Valid options are 'ignore', 'delete', and 'fail'.

  * Type: string
  * Default: ignore
  * Valid Values: [ignore, delete, fail]
  * Importance: low
 No newline at end of file
+1 −1
Original line number Diff line number Diff line
@@ -48,7 +48,7 @@ public class BulkIndexingClient implements BulkClient<IndexableRecord, Bulk> {
  public Bulk bulkRequest(List<IndexableRecord> batch) {
    final Bulk.Builder builder = new Bulk.Builder();
    for (IndexableRecord record : batch) {
      builder.addAction(record.toIndexRequest());
      builder.addAction(record.toBulkableAction());
    }
    return builder.build();
  }
+122 −12
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@

package io.confluent.connect.elasticsearch;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
@@ -38,7 +39,9 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_VALUE;
@@ -53,9 +56,23 @@ public class DataConverter {
  }

  private final boolean useCompactMapEntries;
  private final BehaviorOnNullValues behaviorOnNullValues;

  public DataConverter(boolean useCompactMapEntries) {
  /**
   * Create a DataConverter, specifying how map entries with string keys within record
   * values should be written to JSON. Compact map entries are written as
   * <code>"entryKey": "entryValue"</code>, while the non-compact form are written as a nested
   * document such as <code>{"key": "entryKey", "value": "entryValue"}</code>. All map entries
   * with non-string keys are always written as nested documents.
   *
   * @param useCompactMapEntries true for compact map entries with string keys, or false for
   *                             the nested document form.
   * @param behaviorOnNullValues behavior for handling records with null values; may not be null
   */
  public DataConverter(boolean useCompactMapEntries, BehaviorOnNullValues behaviorOnNullValues) {
    this.useCompactMapEntries = useCompactMapEntries;
    this.behaviorOnNullValues =
        Objects.requireNonNull(behaviorOnNullValues, "behaviorOnNullValues cannot be null.");
  }

  private String convertKey(Schema keySchema, Object key) {
@@ -96,6 +113,45 @@ public class DataConverter {
      boolean ignoreKey,
      boolean ignoreSchema
  ) {
    if (record.value() == null) {
      switch (behaviorOnNullValues) {
        case IGNORE:
          return null;
        case DELETE:
          if (record.key() == null) {
            // Since the record key is used as the ID of the index to delete and we don't have a key
            // for this record, we can't delete anything anyways, so we ignore the record.
            // We can also disregard the value of the ignoreKey parameter, since even if it's true
            // the resulting index we'd try to delete would be based solely off topic/partition/
            // offset information for the SinkRecord. Since that information is guaranteed to be
            // unique per message, we can be confident that there wouldn't be any corresponding
            // index present in ES to delete anyways.
            return null;
          }
          // Will proceed as normal, ultimately creating an IndexableRecord with a null payload
          break;
        case FAIL:
          throw new DataException(String.format(
              "Sink record with key of %s and null value encountered for topic/partition/offset "
              + "%s/%s/%s (to ignore future records like this change the configuration property "
              + "'%s' from '%s' to '%s')",
              record.key(),
              record.topic(),
              record.kafkaPartition(),
              record.kafkaOffset(),
              ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG,
              BehaviorOnNullValues.FAIL,
              BehaviorOnNullValues.IGNORE
          ));
        default:
          throw new RuntimeException(String.format(
              "Unknown value for %s enum: %s",
              BehaviorOnNullValues.class.getSimpleName(),
              behaviorOnNullValues
          ));
      }
    }

    final String id;
    if (ignoreKey) {
      id = record.topic()
@@ -105,20 +161,26 @@ public class DataConverter {
      id = convertKey(record.keySchema(), record.key());
    }

    final Schema schema;
    final Object value;
    if (!ignoreSchema) {
      schema = preProcessSchema(record.valueSchema());
      value = preProcessValue(record.value(), record.valueSchema(), schema);
    } else {
      schema = record.valueSchema();
      value = record.value();
    final String payload = getPayload(record, ignoreSchema);
    final Long version = ignoreKey ? null : record.kafkaOffset();
    return new IndexableRecord(new Key(index, type, id), payload, version);
  }

  private String getPayload(SinkRecord record, boolean ignoreSchema) {
    if (record.value() == null) {
      return null;
    }

    Schema schema = ignoreSchema
        ? record.valueSchema()
        : preProcessSchema(record.valueSchema());

    Object value = ignoreSchema
        ? record.value()
        : preProcessValue(record.value(), record.valueSchema(), schema);

    byte[] rawJsonPayload = JSON_CONVERTER.fromConnectData(record.topic(), schema, value);
    final String payload = new String(rawJsonPayload, StandardCharsets.UTF_8);
    final Long version = ignoreKey ? null : record.kafkaOffset();
    return new IndexableRecord(new Key(index, type, id), payload, version);
    return new String(rawJsonPayload, StandardCharsets.UTF_8);
  }

  // We need to pre process the Kafka Connect schema before converting to JSON as Elasticsearch
@@ -307,4 +369,52 @@ public class DataConverter {
    }
    return newStruct;
  }

  public enum BehaviorOnNullValues {
    IGNORE,
    DELETE,
    FAIL;

    public static final BehaviorOnNullValues DEFAULT = IGNORE;

    // Want values for "behavior.on.null.values" property to be case-insensitive
    public static final ConfigDef.Validator VALIDATOR = new ConfigDef.Validator() {
      private final ConfigDef.ValidString validator = ConfigDef.ValidString.in(names());

      @Override
      public void ensureValid(String name, Object value) {
        if (value instanceof String) {
          value = ((String) value).toLowerCase(Locale.ROOT);
        }
        validator.ensureValid(name, value);
      }

      // Overridden here so that ConfigDef.toEnrichedRst shows possible values correctly
      @Override
      public String toString() {
        return validator.toString();
      }

    };

    public static String[] names() {
      BehaviorOnNullValues[] behaviors = values();
      String[] result = new String[behaviors.length];

      for (int i = 0; i < behaviors.length; i++) {
        result[i] = behaviors[i].toString();
      }

      return result;
    }

    public static BehaviorOnNullValues forValue(String value) {
      return valueOf(value.toUpperCase(Locale.ROOT));
    }

    @Override
    public String toString() {
      return name().toLowerCase(Locale.ROOT);
    }
  }
}
+26 −7
Original line number Diff line number Diff line
@@ -24,6 +24,8 @@ import org.apache.kafka.common.config.ConfigDef.Width;

import java.util.Map;

import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues;

public class ElasticsearchSinkConnectorConfig extends AbstractConfig {

  public static final String CONNECTION_URL_CONFIG = "connection.url";
@@ -125,6 +127,12 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
      + "if any read operation times out, and will need to be restarted to resume "
      + "further operations.";

  public static final String BEHAVIOR_ON_NULL_VALUES_CONFIG = "behavior.on.null.values";
  private static final String BEHAVIOR_ON_NULL_VALUES_DOC = "How to handle records with a "
      + "non-null key and a null value (i.e. Kafka tombstone records). Valid options are "
      + "'ignore', 'delete', and 'fail'.";


  protected static ConfigDef baseConfigDef() {
    final ConfigDef configDef = new ConfigDef();
    addConnectorConfigs(configDef);
@@ -317,7 +325,18 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
        group,
        ++order,
        Width.LONG,
        "Drop invalid messages");
        "Drop invalid messages"
    ).define(
        BEHAVIOR_ON_NULL_VALUES_CONFIG,
        Type.STRING,
        BehaviorOnNullValues.DEFAULT.toString(),
        BehaviorOnNullValues.VALIDATOR,
        Importance.LOW,
        BEHAVIOR_ON_NULL_VALUES_DOC,
        group,
        ++order,
        Width.SHORT,
        "Behavior for null-valued records");
  }

  public static final ConfigDef CONFIG = baseConfigDef();
+9 −1
Original line number Diff line number Diff line
@@ -37,6 +37,8 @@ import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;

import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues;

public class ElasticsearchSinkTask extends SinkTask {

  private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class);
@@ -93,6 +95,11 @@ public class ElasticsearchSinkTask extends SinkTask {
      boolean dropInvalidMessage =
          config.getBoolean(ElasticsearchSinkConnectorConfig.DROP_INVALID_MESSAGE_CONFIG);

      BehaviorOnNullValues behaviorOnNullValues =
          BehaviorOnNullValues.forValue(
              config.getString(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG)
          );

      // Calculate the maximum possible backoff time ...
      long maxRetryBackoffMs = RetryUtil.computeRetryWaitTimeInMillis(maxRetry, retryBackoffMs);
      if (maxRetryBackoffMs > RetryUtil.MAX_RETRY_TIME_MS) {
@@ -137,7 +144,8 @@ public class ElasticsearchSinkTask extends SinkTask {
          .setLingerMs(lingerMs)
          .setRetryBackoffMs(retryBackoffMs)
          .setMaxRetry(maxRetry)
          .setDropInvalidMessage(dropInvalidMessage);
          .setDropInvalidMessage(dropInvalidMessage)
          .setBehaviorOnNullValues(behaviorOnNullValues);

      writer = builder.build();
      writer.start();
Loading