Commit e3049c93 authored by Chris Egerton's avatar Chris Egerton
Browse files

Merge branch '3.3.x' into 4.0.x

Some slight stylistic changes made in order to avoid exceeding cyclomatic
complexity requirements for Checkstyle.
parents 57ce3faa 5d401fee
Loading
Loading
Loading
Loading
+8 −3
Original line number Diff line number Diff line
Configuration Options
---------------------

Connector
^^^^^^^^^

@@ -136,3 +133,11 @@ Data Conversion
  * Type: boolean
  * Default: false
  * Importance: low

``behavior.on.null.values``
  How to handle records with a non-null key and a null value (i.e. Kafka tombstone records). Valid options are 'ignore', 'delete', and 'fail'.

  * Type: string
  * Default: ignore
  * Valid Values: [ignore, delete, fail]
  * Importance: low
 No newline at end of file
+1 −1
Original line number Diff line number Diff line
@@ -48,7 +48,7 @@ public class BulkIndexingClient implements BulkClient<IndexableRecord, Bulk> {
  public Bulk bulkRequest(List<IndexableRecord> batch) {
    final Bulk.Builder builder = new Bulk.Builder();
    for (IndexableRecord record : batch) {
      builder.addAction(record.toIndexRequest());
      builder.addAction(record.toBulkableAction());
    }
    return builder.build();
  }
+122 −12
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@

package io.confluent.connect.elasticsearch;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
@@ -38,7 +39,9 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_VALUE;
@@ -53,9 +56,23 @@ public class DataConverter {
  }

  private final boolean useCompactMapEntries;
  private final BehaviorOnNullValues behaviorOnNullValues;

  public DataConverter(boolean useCompactMapEntries) {
  /**
   * Create a DataConverter, specifying how map entries with string keys within record
   * values should be written to JSON. Compact map entries are written as
   * <code>"entryKey": "entryValue"</code>, while the non-compact form are written as a nested
   * document such as <code>{"key": "entryKey", "value": "entryValue"}</code>. All map entries
   * with non-string keys are always written as nested documents.
   *
   * @param useCompactMapEntries true for compact map entries with string keys, or false for
   *                             the nested document form.
   * @param behaviorOnNullValues behavior for handling records with null values; may not be null
   */
  public DataConverter(boolean useCompactMapEntries, BehaviorOnNullValues behaviorOnNullValues) {
    this.useCompactMapEntries = useCompactMapEntries;
    this.behaviorOnNullValues =
        Objects.requireNonNull(behaviorOnNullValues, "behaviorOnNullValues cannot be null.");
  }

  private String convertKey(Schema keySchema, Object key) {
@@ -96,6 +113,45 @@ public class DataConverter {
      boolean ignoreKey,
      boolean ignoreSchema
  ) {
    if (record.value() == null) {
      switch (behaviorOnNullValues) {
        case IGNORE:
          return null;
        case DELETE:
          if (record.key() == null) {
            // Since the record key is used as the ID of the index to delete and we don't have a key
            // for this record, we can't delete anything anyways, so we ignore the record.
            // We can also disregard the value of the ignoreKey parameter, since even if it's true
            // the resulting index we'd try to delete would be based solely off topic/partition/
            // offset information for the SinkRecord. Since that information is guaranteed to be
            // unique per message, we can be confident that there wouldn't be any corresponding
            // index present in ES to delete anyways.
            return null;
          }
          // Will proceed as normal, ultimately creating an IndexableRecord with a null payload
          break;
        case FAIL:
          throw new DataException(String.format(
              "Sink record with key of %s and null value encountered for topic/partition/offset "
              + "%s/%s/%s (to ignore future records like this change the configuration property "
              + "'%s' from '%s' to '%s')",
              record.key(),
              record.topic(),
              record.kafkaPartition(),
              record.kafkaOffset(),
              ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG,
              BehaviorOnNullValues.FAIL,
              BehaviorOnNullValues.IGNORE
          ));
        default:
          throw new RuntimeException(String.format(
              "Unknown value for %s enum: %s",
              BehaviorOnNullValues.class.getSimpleName(),
              behaviorOnNullValues
          ));
      }
    }

    final String id;
    if (ignoreKey) {
      id = record.topic()
@@ -105,20 +161,26 @@ public class DataConverter {
      id = convertKey(record.keySchema(), record.key());
    }

    final Schema schema;
    final Object value;
    if (!ignoreSchema) {
      schema = preProcessSchema(record.valueSchema());
      value = preProcessValue(record.value(), record.valueSchema(), schema);
    } else {
      schema = record.valueSchema();
      value = record.value();
    final String payload = getPayload(record, ignoreSchema);
    final Long version = ignoreKey ? null : record.kafkaOffset();
    return new IndexableRecord(new Key(index, type, id), payload, version);
  }

  private String getPayload(SinkRecord record, boolean ignoreSchema) {
    if (record.value() == null) {
      return null;
    }

    Schema schema = ignoreSchema
        ? record.valueSchema()
        : preProcessSchema(record.valueSchema());

    Object value = ignoreSchema
        ? record.value()
        : preProcessValue(record.value(), record.valueSchema(), schema);

    byte[] rawJsonPayload = JSON_CONVERTER.fromConnectData(record.topic(), schema, value);
    final String payload = new String(rawJsonPayload, StandardCharsets.UTF_8);
    final Long version = ignoreKey ? null : record.kafkaOffset();
    return new IndexableRecord(new Key(index, type, id), payload, version);
    return new String(rawJsonPayload, StandardCharsets.UTF_8);
  }

  // We need to pre process the Kafka Connect schema before converting to JSON as Elasticsearch
@@ -307,4 +369,52 @@ public class DataConverter {
    }
    return newStruct;
  }

  public enum BehaviorOnNullValues {
    IGNORE,
    DELETE,
    FAIL;

    public static final BehaviorOnNullValues DEFAULT = IGNORE;

    // Want values for "behavior.on.null.values" property to be case-insensitive
    public static final ConfigDef.Validator VALIDATOR = new ConfigDef.Validator() {
      private final ConfigDef.ValidString validator = ConfigDef.ValidString.in(names());

      @Override
      public void ensureValid(String name, Object value) {
        if (value instanceof String) {
          value = ((String) value).toLowerCase(Locale.ROOT);
        }
        validator.ensureValid(name, value);
      }

      // Overridden here so that ConfigDef.toEnrichedRst shows possible values correctly
      @Override
      public String toString() {
        return validator.toString();
      }

    };

    public static String[] names() {
      BehaviorOnNullValues[] behaviors = values();
      String[] result = new String[behaviors.length];

      for (int i = 0; i < behaviors.length; i++) {
        result[i] = behaviors[i].toString();
      }

      return result;
    }

    public static BehaviorOnNullValues forValue(String value) {
      return valueOf(value.toUpperCase(Locale.ROOT));
    }

    @Override
    public String toString() {
      return name().toLowerCase(Locale.ROOT);
    }
  }
}
+26 −7
Original line number Diff line number Diff line
@@ -24,6 +24,8 @@ import org.apache.kafka.common.config.ConfigDef.Width;

import java.util.Map;

import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues;

public class ElasticsearchSinkConnectorConfig extends AbstractConfig {

  public static final String CONNECTION_URL_CONFIG = "connection.url";
@@ -121,6 +123,12 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
      + "if any read operation times out, and will need to be restarted to resume "
      + "further operations.";

  public static final String BEHAVIOR_ON_NULL_VALUES_CONFIG = "behavior.on.null.values";
  private static final String BEHAVIOR_ON_NULL_VALUES_DOC = "How to handle records with a "
      + "non-null key and a null value (i.e. Kafka tombstone records). Valid options are "
      + "'ignore', 'delete', and 'fail'.";


  protected static ConfigDef baseConfigDef() {
    final ConfigDef configDef = new ConfigDef();
    addConnectorConfigs(configDef);
@@ -313,7 +321,18 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
        group,
        ++order,
        Width.LONG,
        "Drop invalid messages");
        "Drop invalid messages"
    ).define(
        BEHAVIOR_ON_NULL_VALUES_CONFIG,
        Type.STRING,
        BehaviorOnNullValues.DEFAULT.toString(),
        BehaviorOnNullValues.VALIDATOR,
        Importance.LOW,
        BEHAVIOR_ON_NULL_VALUES_DOC,
        group,
        ++order,
        Width.SHORT,
        "Behavior for null-valued records");
  }

  public static final ConfigDef CONFIG = baseConfigDef();
+9 −1
Original line number Diff line number Diff line
@@ -37,6 +37,8 @@ import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;

import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues;

public class ElasticsearchSinkTask extends SinkTask {

  private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class);
@@ -93,6 +95,11 @@ public class ElasticsearchSinkTask extends SinkTask {
      boolean dropInvalidMessage =
          config.getBoolean(ElasticsearchSinkConnectorConfig.DROP_INVALID_MESSAGE_CONFIG);

      BehaviorOnNullValues behaviorOnNullValues =
          BehaviorOnNullValues.forValue(
              config.getString(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG)
          );

      // Calculate the maximum possible backoff time ...
      long maxRetryBackoffMs = RetryUtil.computeRetryWaitTimeInMillis(maxRetry, retryBackoffMs);
      if (maxRetryBackoffMs > RetryUtil.MAX_RETRY_TIME_MS) {
@@ -137,7 +144,8 @@ public class ElasticsearchSinkTask extends SinkTask {
          .setLingerMs(lingerMs)
          .setRetryBackoffMs(retryBackoffMs)
          .setMaxRetry(maxRetry)
          .setDropInvalidMessage(dropInvalidMessage);
          .setDropInvalidMessage(dropInvalidMessage)
          .setBehaviorOnNullValues(behaviorOnNullValues);

      writer = builder.build();
      writer.start();
Loading