Commit 91d7e686 authored by Randall Hauch's avatar Randall Hauch
Browse files

Merge branch '3.4.x' into 4.0.x

parents 0ea6abf3 cf3245de
Loading
Loading
Loading
Loading
+7 −0
Original line number Diff line number Diff line
@@ -88,6 +88,13 @@ Data Conversion
  * Default: false
  * Importance: low

``compact.map.entries``
  Defines how map entries with string keys within record values should be written to JSON. When this is set to ``true``, these entries are written compactly as ``"entryKey": "entryValue"``. Otherwise, map entries with string keys are written as a nested document ``{"key": "entryKey", "value": "entryValue"}``. All map entries with non-string keys are always written as nested documents. Prior to 3.3.0, this connector always wrote map entries as nested documents, so set this to ``false`` to use that older behavior.

  * Type: boolean
  * Default: true
  * Importance: low

``topic.index.map``
  A map from Kafka topic name to the destination Elasticsearch index, represented as a list of ``topic:index`` pairs.

+23 −16
Original line number Diff line number Diff line
@@ -52,7 +52,13 @@ public class DataConverter {
    JSON_CONVERTER.configure(Collections.singletonMap("schemas.enable", "false"), false);
  }

  private static String convertKey(Schema keySchema, Object key) {
  private final boolean useCompactMapEntries;

  public DataConverter(boolean useCompactMapEntries) {
    this.useCompactMapEntries = useCompactMapEntries;
  }

  private String convertKey(Schema keySchema, Object key) {
    if (key == null) {
      throw new ConnectException("Key is used as document id and can not be null.");
    }
@@ -83,7 +89,7 @@ public class DataConverter {
    }
  }

  public static IndexableRecord convertRecord(
  public IndexableRecord convertRecord(
      SinkRecord record,
      String index,
      String type,
@@ -96,7 +102,7 @@ public class DataConverter {
           + "+" + String.valueOf((int) record.kafkaPartition())
           + "+" + String.valueOf(record.kafkaOffset());
    } else {
      id = DataConverter.convertKey(record.keySchema(), record.key());
      id = convertKey(record.keySchema(), record.key());
    }

    final Schema schema;
@@ -120,7 +126,8 @@ public class DataConverter {
  // completely rewrite a converter for Elasticsearch, we will refactor the JSON converter to
  // support customized translation. The pre process is no longer needed once we have the JSON
  // converter refactored.
  static Schema preProcessSchema(Schema schema) {
  // visible for testing
  Schema preProcessSchema(Schema schema) {
    if (schema == null) {
      return null;
    }
@@ -153,19 +160,19 @@ public class DataConverter {
    }
  }

  private static Schema preProcessArraySchema(Schema schema) {
  private Schema preProcessArraySchema(Schema schema) {
    Schema valSchema = preProcessSchema(schema.valueSchema());
    return copySchemaBasics(schema, SchemaBuilder.array(valSchema)).build();
  }

  private static Schema preProcessMapSchema(Schema schema) {
  private Schema preProcessMapSchema(Schema schema) {
    Schema keySchema = schema.keySchema();
    Schema valueSchema = schema.valueSchema();
    String keyName = keySchema.name() == null ? keySchema.type().name() : keySchema.name();
    String valueName = valueSchema.name() == null ? valueSchema.type().name() : valueSchema.name();
    Schema preprocessedKeySchema = preProcessSchema(keySchema);
    Schema preprocessedValueSchema = preProcessSchema(valueSchema);
    if (keySchema.type() == Schema.Type.STRING) {
    if (useCompactMapEntries && keySchema.type() == Schema.Type.STRING) {
      return SchemaBuilder.map(preprocessedKeySchema, preprocessedValueSchema).build();
    }
    Schema elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName)
@@ -175,7 +182,7 @@ public class DataConverter {
    return copySchemaBasics(schema, SchemaBuilder.array(elementSchema)).build();
  }

  private static Schema preProcessStructSchema(Schema schema) {
  private Schema preProcessStructSchema(Schema schema) {
    SchemaBuilder builder = copySchemaBasics(schema, SchemaBuilder.struct().name(schema.name()));
    for (Field field : schema.fields()) {
      builder.field(field.name(), preProcessSchema(field.schema()));
@@ -183,7 +190,7 @@ public class DataConverter {
    return builder.build();
  }

  private static SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder target) {
  private SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder target) {
    if (source.isOptional()) {
      target.optional();
    }
@@ -195,7 +202,7 @@ public class DataConverter {
  }

  // visible for testing
  static Object preProcessValue(Object value, Schema schema, Schema newSchema) {
  Object preProcessValue(Object value, Schema schema, Schema newSchema) {
    // Handle missing schemas and acceptable null values
    if (schema == null) {
      return value;
@@ -229,7 +236,7 @@ public class DataConverter {
    }
  }

  private static Object preProcessNullValue(Schema schema) {
  private Object preProcessNullValue(Schema schema) {
    if (schema.defaultValue() != null) {
      return schema.defaultValue();
    }
@@ -240,7 +247,7 @@ public class DataConverter {
  }

  // @returns the decoded logical value or null if this isn't a known logical type
  private static Object preProcessLogicalValue(String schemaName, Object value) {
  private Object preProcessLogicalValue(String schemaName, Object value) {
    switch (schemaName) {
      case Decimal.LOGICAL_NAME:
        return ((BigDecimal) value).doubleValue();
@@ -254,7 +261,7 @@ public class DataConverter {
    }
  }

  private static Object preProcessArrayValue(Object value, Schema schema, Schema newSchema) {
  private Object preProcessArrayValue(Object value, Schema schema, Schema newSchema) {
    Collection collection = (Collection) value;
    List<Object> result = new ArrayList<>();
    for (Object element: collection) {
@@ -263,12 +270,12 @@ public class DataConverter {
    return result;
  }

  private static Object preProcessMapValue(Object value, Schema schema, Schema newSchema) {
  private Object preProcessMapValue(Object value, Schema schema, Schema newSchema) {
    Schema keySchema = schema.keySchema();
    Schema valueSchema = schema.valueSchema();
    Schema newValueSchema = newSchema.valueSchema();
    Map<?, ?> map = (Map<?, ?>) value;
    if (keySchema.type() == Schema.Type.STRING) {
    if (useCompactMapEntries && keySchema.type() == Schema.Type.STRING) {
      Map<Object, Object> processedMap = new HashMap<>();
      for (Map.Entry<?, ?> entry: map.entrySet()) {
        processedMap.put(
@@ -290,7 +297,7 @@ public class DataConverter {
    return mapStructs;
  }

  private static Object preProcessStructValue(Object value, Schema schema, Schema newSchema) {
  private Object preProcessStructValue(Object value, Schema schema, Schema newSchema) {
    Struct struct = (Struct) value;
    Struct newStruct = new Struct(newSchema);
    for (Field field : schema.fields()) {
+20 −0
Original line number Diff line number Diff line
@@ -99,6 +99,16 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
  private static final String DROP_INVALID_MESSAGE_DOC =
          "Whether to drop kafka message when it cannot be converted to output message.";

  public static final String COMPACT_MAP_ENTRIES_CONFIG = "compact.map.entries";
  private static final String COMPACT_MAP_ENTRIES_DOC =
      "Defines how map entries with string keys within record values should be written to JSON. "
      + "When this is set to ``true``, these entries are written compactly as "
      + "``\"entryKey\": \"entryValue\"``. "
      + "Otherwise, map entries with string keys are written as a nested document "
      + "``{\"key\": \"entryKey\", \"value\": \"entryValue\"}``. "
      + "All map entries with non-string keys are always written as nested documents. "
      + "Prior to 3.3.0, this connector always wrote map entries as nested documents, "
      + "so set this to ``false`` to use that older behavior.";

  protected static ConfigDef baseConfigDef() {
    final ConfigDef configDef = new ConfigDef();
@@ -224,6 +234,16 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
        ++order,
        Width.SHORT,
        "Ignore Schema mode"
    ).define(
        COMPACT_MAP_ENTRIES_CONFIG,
        Type.BOOLEAN,
        true,
        Importance.LOW,
        COMPACT_MAP_ENTRIES_DOC,
        group,
        ++order,
        Width.SHORT,
        "Compact Map Entries"
    ).define(
        TOPIC_INDEX_MAP_CONFIG,
        Type.LIST,
+3 −0
Original line number Diff line number Diff line
@@ -64,6 +64,8 @@ public class ElasticsearchSinkTask extends SinkTask {
          config.getBoolean(ElasticsearchSinkConnectorConfig.KEY_IGNORE_CONFIG);
      boolean ignoreSchema =
          config.getBoolean(ElasticsearchSinkConnectorConfig.SCHEMA_IGNORE_CONFIG);
      boolean useCompactMapEntries =
          config.getBoolean(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG);


      Map<String, String> topicToIndexMap =
@@ -120,6 +122,7 @@ public class ElasticsearchSinkTask extends SinkTask {
          .setType(type)
          .setIgnoreKey(ignoreKey, topicIgnoreKey)
          .setIgnoreSchema(ignoreSchema, topicIgnoreSchema)
          .setCompactMapEntries(useCompactMapEntries)
          .setTopicToIndexMap(topicToIndexMap)
          .setFlushTimoutMs(flushTimeoutMs)
          .setMaxBufferedRecords(maxBufferedRecords)
+11 −1
Original line number Diff line number Diff line
@@ -50,12 +50,14 @@ public class ElasticsearchWriter {
  private final long flushTimeoutMs;
  private final BulkProcessor<IndexableRecord, ?> bulkProcessor;
  private final boolean dropInvalidMessage;
  private final DataConverter converter;

  private final Set<String> existingMappings;

  ElasticsearchWriter(
      JestClient client,
      String type,
      boolean useCompactMapEntries,
      boolean ignoreKey,
      Set<String> ignoreKeyTopics,
      boolean ignoreSchema,
@@ -79,6 +81,7 @@ public class ElasticsearchWriter {
    this.topicToIndexMap = topicToIndexMap;
    this.flushTimeoutMs = flushTimeoutMs;
    this.dropInvalidMessage = dropInvalidMessage;
    this.converter = new DataConverter(useCompactMapEntries);

    bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
@@ -97,6 +100,7 @@ public class ElasticsearchWriter {
  public static class Builder {
    private final JestClient client;
    private String type;
    private boolean useCompactMapEntries = true;
    private boolean ignoreKey = false;
    private Set<String> ignoreKeyTopics = Collections.emptySet();
    private boolean ignoreSchema = false;
@@ -132,6 +136,11 @@ public class ElasticsearchWriter {
      return this;
    }

    public Builder setCompactMapEntries(boolean useCompactMapEntries) {
      this.useCompactMapEntries = useCompactMapEntries;
      return this;
    }

    public Builder setTopicToIndexMap(Map<String, String> topicToIndexMap) {
      this.topicToIndexMap = topicToIndexMap;
      return this;
@@ -181,6 +190,7 @@ public class ElasticsearchWriter {
      return new ElasticsearchWriter(
          client,
          type,
          useCompactMapEntries,
          ignoreKey,
          ignoreKeyTopics,
          ignoreSchema,
@@ -241,7 +251,7 @@ public class ElasticsearchWriter {
    IndexableRecord indexableRecord = null;

    try {
      indexableRecord = DataConverter.convertRecord(
      indexableRecord = converter.convertRecord(
              sinkRecord,
              index,
              type,
Loading