Commit 5d648662 authored by Randall Hauch's avatar Randall Hauch Committed by GitHub
Browse files

Merge pull request #126 from rhauch/cc-1191

CC-1191: Added feature flag to control string-keyed map entry JSON serialization
parents 7f0217af 66b8b3c5
Loading
Loading
Loading
Loading
+7 −0
Original line number Diff line number Diff line
@@ -88,6 +88,13 @@ Data Conversion
  * Default: false
  * Importance: low

``compact.map.entries``
  Defines how map entries with string keys within record values should be written to JSON. When this is set to ``true``, these entries are written compactly as ``"entryKey": "entryValue"``. Otherwise, map entries with string keys are written as a nested document ``{"key": "entryKey", "value": "entryValue"}``. All map entries with non-string keys are always written as nested documents. Prior to 3.3.0, this connector always wrote map entries as nested documents, so set this to ``false`` to use that older behavior.

  * Type: boolean
  * Default: true
  * Importance: low

``topic.index.map``
  A map from Kafka topic name to the destination Elasticsearch index, represented as a list of ``topic:index`` pairs.

+15 −8
Original line number Diff line number Diff line
@@ -51,7 +51,13 @@ public class DataConverter {
    JSON_CONVERTER.configure(Collections.singletonMap("schemas.enable", "false"), false);
  }

  private static String convertKey(Schema keySchema, Object key) {
  private final boolean useCompactMapEntries;

  public DataConverter(boolean useCompactMapEntries) {
    this.useCompactMapEntries = useCompactMapEntries;
  }

  private String convertKey(Schema keySchema, Object key) {
    if (key == null) {
      throw new ConnectException("Key is used as document id and can not be null.");
    }
@@ -78,12 +84,12 @@ public class DataConverter {
    }
  }

  public static IndexableRecord convertRecord(SinkRecord record, String index, String type, boolean ignoreKey, boolean ignoreSchema) {
  public IndexableRecord convertRecord(SinkRecord record, String index, String type, boolean ignoreKey, boolean ignoreSchema) {
    final String id;
    if (ignoreKey) {
      id = record.topic() + "+" + String.valueOf((int) record.kafkaPartition()) + "+" + String.valueOf(record.kafkaOffset());
    } else {
      id = DataConverter.convertKey(record.keySchema(), record.key());
      id = convertKey(record.keySchema(), record.key());
    }

    final Schema schema;
@@ -105,7 +111,8 @@ public class DataConverter {
  // expects a different JSON format from the current JSON converter provides. Rather than completely
  // rewrite a converter for Elasticsearch, we will refactor the JSON converter to support customized
  // translation. The pre process is no longer needed once we have the JSON converter refactored.
  static Schema preProcessSchema(Schema schema) {
  // visible for testing
  Schema preProcessSchema(Schema schema) {
    if (schema == null) {
      return null;
    }
@@ -132,7 +139,7 @@ public class DataConverter {
        Schema valueSchema = schema.valueSchema();
        String keyName = keySchema.name() == null ? keySchema.type().name() : keySchema.name();
        String valueName = valueSchema.name() == null ? valueSchema.type().name() : valueSchema.name();
        if (keySchema.type() == Schema.Type.STRING) {
        if (useCompactMapEntries && keySchema.type() == Schema.Type.STRING) {
          return SchemaBuilder.map(preProcessSchema(keySchema), preProcessSchema(valueSchema)).build();
        }
        Schema elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName)
@@ -154,7 +161,7 @@ public class DataConverter {
    }
  }

  private static SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder target) {
  private SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder target) {
    if (source.isOptional()) {
      target.optional();
    }
@@ -166,7 +173,7 @@ public class DataConverter {
  }

  // visible for testing
  static Object preProcessValue(Object value, Schema schema, Schema newSchema) {
  Object preProcessValue(Object value, Schema schema, Schema newSchema) {
    if (schema == null) {
      return value;
    }
@@ -207,7 +214,7 @@ public class DataConverter {
        Schema valueSchema = schema.valueSchema();
        Schema newValueSchema = newSchema.valueSchema();
        Map<?, ?> map = (Map<?, ?>) value;
        if (keySchema.type() == Schema.Type.STRING) {
        if (useCompactMapEntries && keySchema.type() == Schema.Type.STRING) {
          Map<Object, Object> processedMap = new HashMap<>();
          for (Map.Entry<?, ?> entry: map.entrySet()) {
            processedMap.put(preProcessValue(entry.getKey(), keySchema, newSchema.keySchema()),
+10 −0
Original line number Diff line number Diff line
@@ -40,6 +40,7 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
  public static final String TOPIC_KEY_IGNORE_CONFIG = "topic.key.ignore";
  public static final String SCHEMA_IGNORE_CONFIG = "schema.ignore";
  public static final String TOPIC_SCHEMA_IGNORE_CONFIG = "topic.schema.ignore";
  public static final String COMPACT_MAP_ENTRIES_CONFIG = "compact.map.entries";

  protected static ConfigDef baseConfigDef() {
    final ConfigDef configDef = new ConfigDef();
@@ -102,6 +103,15 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
                  + "Elasticsearch will infer the mapping from the data (dynamic mapping needs to be enabled by the user).\n"
                  + "Note that this is a global config that applies to all topics, use ``" + TOPIC_SCHEMA_IGNORE_CONFIG + "`` to override as ``true`` for specific topics.",
                  group, ++order, Width.SHORT, "Ignore Schema mode")
          .define(COMPACT_MAP_ENTRIES_CONFIG, Type.BOOLEAN, true, Importance.LOW,
                  "Defines how map entries with string keys within record values should be written to JSON. "
                  + "When this is set to ``true``, these entries are written compactly as ``\"entryKey\": \"entryValue\"``. "
                  + "Otherwise, map entries with string keys are written as a nested document "
                  + "``{\"key\": \"entryKey\", \"value\": \"entryValue\"}``. "
                  + "All map entries with non-string keys are always written as nested documents. "
                  + "Prior to 3.3.0, this connector always wrote map entries as nested documents, so set this to ``false`` to use "
                  + "that older behavior.",
                  group, ++order, Width.SHORT, "Compact Map Entries")
          .define(TOPIC_INDEX_MAP_CONFIG, Type.LIST, "", Importance.LOW,
                  "A map from Kafka topic name to the destination Elasticsearch index, represented as a list of ``topic:index`` pairs.",
                  group, ++order, Width.LONG, "Topic to Index Map")
+2 −0
Original line number Diff line number Diff line
@@ -62,6 +62,7 @@ public class ElasticsearchSinkTask extends SinkTask {
      String type = config.getString(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG);
      boolean ignoreKey = config.getBoolean(ElasticsearchSinkConnectorConfig.KEY_IGNORE_CONFIG);
      boolean ignoreSchema = config.getBoolean(ElasticsearchSinkConnectorConfig.SCHEMA_IGNORE_CONFIG);
      boolean useCompactMapEntries = config.getBoolean(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG);

      Map<String, String> topicToIndexMap = parseMapConfig(config.getList(ElasticsearchSinkConnectorConfig.TOPIC_INDEX_MAP_CONFIG));
      Set<String> topicIgnoreKey = new HashSet<>(config.getList(ElasticsearchSinkConnectorConfig.TOPIC_KEY_IGNORE_CONFIG));
@@ -98,6 +99,7 @@ public class ElasticsearchSinkTask extends SinkTask {
          .setType(type)
          .setIgnoreKey(ignoreKey, topicIgnoreKey)
          .setIgnoreSchema(ignoreSchema, topicIgnoreSchema)
          .setCompactMapEntries(useCompactMapEntries)
          .setTopicToIndexMap(topicToIndexMap)
          .setFlushTimoutMs(flushTimeoutMs)
          .setMaxBufferedRecords(maxBufferedRecords)
+12 −1
Original line number Diff line number Diff line
@@ -49,12 +49,14 @@ public class ElasticsearchWriter {
  private final Map<String, String> topicToIndexMap;
  private final long flushTimeoutMs;
  private final BulkProcessor<IndexableRecord, ?> bulkProcessor;
  private final DataConverter converter;

  private final Set<String> existingMappings;

  ElasticsearchWriter(
      JestClient client,
      String type,
      boolean useCompactMapEntries,
      boolean ignoreKey,
      Set<String> ignoreKeyTopics,
      boolean ignoreSchema,
@@ -76,6 +78,7 @@ public class ElasticsearchWriter {
    this.ignoreSchemaTopics = ignoreSchemaTopics;
    this.topicToIndexMap = topicToIndexMap;
    this.flushTimeoutMs = flushTimeoutMs;
    this.converter = new DataConverter(useCompactMapEntries);

    bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
@@ -94,6 +97,7 @@ public class ElasticsearchWriter {
  public static class Builder {
    private final JestClient client;
    private String type;
    private boolean useCompactMapEntries = true;
    private boolean ignoreKey = false;
    private Set<String> ignoreKeyTopics = Collections.emptySet();
    private boolean ignoreSchema = false;
@@ -128,6 +132,11 @@ public class ElasticsearchWriter {
      return this;
    }

    public Builder setCompactMapEntries(boolean useCompactMapEntries) {
      this.useCompactMapEntries = useCompactMapEntries;
      return this;
    }

    public Builder setTopicToIndexMap(Map<String, String> topicToIndexMap) {
      this.topicToIndexMap = topicToIndexMap;
      return this;
@@ -172,6 +181,7 @@ public class ElasticsearchWriter {
      return new ElasticsearchWriter(
          client,
          type,
          useCompactMapEntries,
          ignoreKey,
          ignoreKeyTopics,
          ignoreSchema,
@@ -207,7 +217,8 @@ public class ElasticsearchWriter {
        existingMappings.add(index);
      }

      final IndexableRecord indexableRecord = DataConverter.convertRecord(sinkRecord, index, type, ignoreKey, ignoreSchema);
      final IndexableRecord indexableRecord = converter.convertRecord(sinkRecord, index, type,
                                                                      ignoreKey, ignoreSchema);

      bulkProcessor.add(indexableRecord, flushTimeoutMs);
    }
Loading