Commit 7fdccb20 authored by Shikhar Bhushan's avatar Shikhar Bhushan
Browse files

Update tests to verify the source of each indexed document

Some refactoring that was helpful to achieve that:
- Move mapping creation out of DataConverter
- Fix handling of topic-level overrides: #23
parent ad8ed81d
Loading
Loading
Loading
Loading
+15 −64
Original line number Diff line number Diff line
@@ -31,16 +31,12 @@ import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.Converter;

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;

import io.searchbox.client.JestClient;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_VALUE;
@@ -53,7 +49,7 @@ public class DataConverter {
    JSON_CONVERTER.configure(Collections.singletonMap("schemas.enable", "false"), false);
  }

  public static String convertKey(Object key, Schema keySchema) {
  private static String convertKey(Schema keySchema, Object key) {
    if (key == null) {
      throw new ConnectException("Key is used as document id and can not be null.");
    }
@@ -78,71 +74,26 @@ public class DataConverter {
    }
  }

  public static IndexableRecord convertRecord(
      SinkRecord record,
      String type,
      JestClient client,
      boolean ignoreKey,
      boolean ignoreSchema,
      Map<String, TopicConfig> topicConfigs,
      Set<String> mappingCache) {

    String topic = record.topic();
    int partition = record.kafkaPartition();
    long offset = record.kafkaOffset();

    Object key = record.key();
    Schema keySchema = record.keySchema();
    Object value = record.value();
    Schema valueSchema = record.valueSchema();

    String index;
    String id;
    boolean topicIgnoreKey;
    boolean topicIgnoreSchema;

    if (topicConfigs.containsKey(topic)) {
      TopicConfig topicConfig = topicConfigs.get(topic);
      index = topicConfig.getIndex();
      topicIgnoreKey = topicConfig.ignoreKey();
      topicIgnoreSchema = topicConfig.ignoreSchema();
    } else {
      index = topic;
      topicIgnoreKey = ignoreKey;
      topicIgnoreSchema = ignoreSchema;
    }

    if (topicIgnoreKey) {
      id = topic + "+" + String.valueOf(partition) + "+" + String.valueOf(offset);
  public static IndexableRecord convertRecord(SinkRecord record, String index, String type, boolean ignoreKey, boolean ignoreSchema) {
    final String id;
    if (ignoreKey) {
      id = record.topic() + "+" + String.valueOf((int) record.kafkaPartition()) + "+" + String.valueOf(record.kafkaOffset());
    } else {
      id = DataConverter.convertKey(key, keySchema);
    }

    try {
      if (!topicIgnoreSchema && !mappingCache.contains(index) && !Mapping.doesMappingExist(client, index, type, mappingCache)) {
        Mapping.createMapping(client, index, type, valueSchema);
        mappingCache.add(index);
      }
    } catch (IOException e) {
      // TODO: It is possible that two clients are creating the mapping at the same time and
      // one request to create mapping may fail. In this case, we should allow the task to
      // proceed instead of throw the exception.
      throw new ConnectException("Cannot create mapping:", e);
      id = DataConverter.convertKey(record.keySchema(), record.key());
    }

    Schema newSchema;
    Object newValue;
    if (!topicIgnoreSchema) {
      newSchema = preProcessSchema(valueSchema);
      newValue = preProcessValue(value, valueSchema, newSchema);
    final Schema schema;
    final Object value;
    if (!ignoreSchema) {
      schema = preProcessSchema(record.valueSchema());
      value = preProcessValue(record.value(), record.valueSchema(), schema);
    } else {
      newSchema = valueSchema;
      newValue = value;
      schema = record.valueSchema();
      value = record.value();
    }

    String payload = new String(JSON_CONVERTER.fromConnectData(topic, newSchema, newValue), StandardCharsets.UTF_8);

    return new IndexableRecord(new Key(index, type, id), payload, offset);
    final String payload = new String(JSON_CONVERTER.fromConnectData(record.topic(), schema, value), StandardCharsets.UTF_8);
    return new IndexableRecord(new Key(index, type, id), payload, record.kafkaOffset());
  }

  // We need to pre process the Kafka Connect schema before converting to JSON as Elasticsearch
+7 −22
Original line number Diff line number Diff line
@@ -62,10 +62,9 @@ public class ElasticsearchSinkTask extends SinkTask {
      boolean ignoreKey = config.getBoolean(ElasticsearchSinkConnectorConfig.KEY_IGNORE_CONFIG);
      boolean ignoreSchema = config.getBoolean(ElasticsearchSinkConnectorConfig.SCHEMA_IGNORE_CONFIG);

      List<String> topicIndex = config.getList(ElasticsearchSinkConnectorConfig.TOPIC_INDEX_MAP_CONFIG);
      List<String> topicIgnoreKey = config.getList(ElasticsearchSinkConnectorConfig.TOPIC_KEY_IGNORE_CONFIG);
      List<String> topicIgnoreSchema = config.getList(ElasticsearchSinkConnectorConfig.TOPIC_SCHEMA_IGNORE_CONFIG);
      Map<String, TopicConfig> topicConfigs = constructTopicConfig(topicIndex, topicIgnoreKey, topicIgnoreSchema);
      Map<String, String> topicToIndexMap = parseMapConfig(config.getList(ElasticsearchSinkConnectorConfig.TOPIC_INDEX_MAP_CONFIG));
      Set<String> topicIgnoreKey = new HashSet<>(config.getList(ElasticsearchSinkConnectorConfig.TOPIC_KEY_IGNORE_CONFIG));
      Set<String> topicIgnoreSchema =  new HashSet<>(config.getList(ElasticsearchSinkConnectorConfig.TOPIC_SCHEMA_IGNORE_CONFIG));

      long flushTimeoutMs = config.getLong(ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG);
      int maxBufferedRecords = config.getInt(ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG);
@@ -86,9 +85,9 @@ public class ElasticsearchSinkTask extends SinkTask {

      ElasticsearchWriter.Builder builder = new ElasticsearchWriter.Builder(this.client)
          .setType(type)
          .setIgnoreKey(ignoreKey)
          .setIgnoreSchema(ignoreSchema)
          .setTopicConfigs(topicConfigs)
          .setIgnoreKey(ignoreKey, topicIgnoreKey)
          .setIgnoreSchema(ignoreSchema, topicIgnoreSchema)
          .setTopicToIndexMap(topicToIndexMap)
          .setFlushTimoutMs(flushTimeoutMs)
          .setMaxBufferedRecords(maxBufferedRecords)
          .setMaxInFlightRequests(maxInFlightRequests)
@@ -111,7 +110,7 @@ public class ElasticsearchSinkTask extends SinkTask {
    for (TopicPartition tp : partitions) {
      topics.add(tp.topic());
    }
    writer.createIndices(topics);
    writer.createIndicesForTopics(topics);
  }

  @Override
@@ -153,18 +152,4 @@ public class ElasticsearchSinkTask extends SinkTask {
    return map;
  }

  private Map<String, TopicConfig> constructTopicConfig(List<String> topicType, List<String> topicIgnoreKey, List<String> topicIgnoreSchema) {
    Map<String, TopicConfig> topicConfigMap = new HashMap<>();
    Map<String, String> topicTypeMap = parseMapConfig(topicType);
    Set<String> topicIgnoreKeySet = new HashSet<>(topicIgnoreKey);
    Set<String> topicIgnoreSchemaSet = new HashSet<>(topicIgnoreSchema);

    for (String topic: topicTypeMap.keySet()) {
      String type = topicTypeMap.get(topic);
      TopicConfig topicConfig = new TopicConfig(type, topicIgnoreKeySet.contains(topic), topicIgnoreSchemaSet.contains(topic));
      topicConfigMap.put(topic, topicConfig);
    }
    return topicConfigMap;
  }

}
+69 −25
Original line number Diff line number Diff line
@@ -43,19 +43,23 @@ public class ElasticsearchWriter {
  private final JestClient client;
  private final String type;
  private final boolean ignoreKey;
  private final Set<String> ignoreKeyTopics;
  private final boolean ignoreSchema;
  private final Map<String, TopicConfig> topicConfigs;
  private final Set<String> ignoreSchemaTopics;
  private final Map<String, String> topicToIndexMap;
  private final long flushTimeoutMs;

  private final Set<String> mappings;
  private final BulkProcessor<IndexableRecord, ?> bulkProcessor;

  private final Set<String> existingMappings;

  ElasticsearchWriter(
      JestClient client,
      String type,
      boolean ignoreKey,
      Set<String> ignoreKeyTopics,
      boolean ignoreSchema,
      Map<String, TopicConfig> topicConfigs,
      Set<String> ignoreSchemaTopics,
      Map<String, String> topicToIndexMap,
      long flushTimeoutMs,
      int maxBufferedRecords,
      int maxInFlightRequests,
@@ -67,12 +71,12 @@ public class ElasticsearchWriter {
    this.client = client;
    this.type = type;
    this.ignoreKey = ignoreKey;
    this.ignoreKeyTopics = ignoreKeyTopics;
    this.ignoreSchema = ignoreSchema;
    this.topicConfigs = topicConfigs == null ? Collections.<String, TopicConfig>emptyMap() : topicConfigs;
    this.ignoreSchemaTopics = ignoreSchemaTopics;
    this.topicToIndexMap = topicToIndexMap;
    this.flushTimeoutMs = flushTimeoutMs;

    mappings = new HashSet<>();

    bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
        new BulkIndexingClient(client),
@@ -83,14 +87,18 @@ public class ElasticsearchWriter {
        maxRetries,
        retryBackoffMs
    );

    existingMappings = new HashSet<>();
  }

  public static class Builder {
    private final JestClient client;
    private String type;
    private boolean ignoreKey = false;
    private Set<String> ignoreKeyTopics = Collections.emptySet();
    private boolean ignoreSchema = false;
    private Map<String, TopicConfig> topicConfigs = new HashMap<>();
    private Set<String> ignoreSchemaTopics = Collections.emptySet();
    private Map<String, String> topicToIndexMap = new HashMap<>();
    private long flushTimeoutMs;
    private int maxBufferedRecords;
    private int maxInFlightRequests;
@@ -108,18 +116,20 @@ public class ElasticsearchWriter {
      return this;
    }

    public Builder setIgnoreKey(boolean ignoreKey) {
    public Builder setIgnoreKey(boolean ignoreKey, Set<String> ignoreKeyTopics) {
      this.ignoreKey = ignoreKey;
      this.ignoreKeyTopics = ignoreKeyTopics;
      return this;
    }

    public Builder setIgnoreSchema(boolean ignoreSchema) {
    public Builder setIgnoreSchema(boolean ignoreSchema, Set<String> ignoreSchemaTopics) {
      this.ignoreSchema = ignoreSchema;
      this.ignoreSchemaTopics = ignoreSchemaTopics;
      return this;
    }

    public Builder setTopicConfigs(Map<String, TopicConfig> topicConfigs) {
      this.topicConfigs = topicConfigs;
    public Builder setTopicToIndexMap(Map<String, String> topicToIndexMap) {
      this.topicToIndexMap = topicToIndexMap;
      return this;
    }

@@ -160,14 +170,44 @@ public class ElasticsearchWriter {

    public ElasticsearchWriter build() {
      return new ElasticsearchWriter(
          client, type, ignoreKey, ignoreSchema, topicConfigs, flushTimeoutMs, maxBufferedRecords, maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs
          client,
          type,
          ignoreKey,
          ignoreKeyTopics,
          ignoreSchema,
          ignoreSchemaTopics,
          topicToIndexMap,
          flushTimeoutMs,
          maxBufferedRecords,
          maxInFlightRequests,
          batchSize,
          lingerMs,
          maxRetry,
          retryBackoffMs
      );
    }
  }

  public void write(Collection<SinkRecord> records) {
    for (SinkRecord sinkRecord : records) {
      final IndexableRecord indexableRecord = DataConverter.convertRecord(sinkRecord, type, client, ignoreKey, ignoreSchema, topicConfigs, mappings);
      final String index = sinkRecord.topic();
      final boolean ignoreKey = ignoreKeyTopics.contains(sinkRecord.topic()) || this.ignoreKey;
      final boolean ignoreSchema = ignoreSchemaTopics.contains(sinkRecord.topic()) || this.ignoreSchema;

      if (!ignoreSchema && !existingMappings.contains(index)) {
        try {
          if (!Mapping.doesMappingExist(client, index, type)) {
            Mapping.createMapping(client, index, type, sinkRecord.valueSchema());
          }
        } catch (IOException e) {
          // FIXME: concurrent tasks could attempt to create the mapping and one of the requests may fail
          throw new ConnectException("Failed to initialize mapping for index: " + index, e);
        }
        existingMappings.add(index);
      }

      final IndexableRecord indexableRecord = DataConverter.convertRecord(sinkRecord, index, type, ignoreKey, ignoreSchema);

      bulkProcessor.add(indexableRecord, flushTimeoutMs);
    }
  }
@@ -200,17 +240,8 @@ public class ElasticsearchWriter {
    }
  }

  public void createIndices(Set<String> assignedTopics) {
    Set<String> indices = new HashSet<>();
    for (String topic : assignedTopics) {
      final TopicConfig topicConfig = topicConfigs.get(topic);
      if (topicConfig != null) {
        indices.add(topicConfig.getIndex());
      } else {
        indices.add(topic);
      }
    }
    for (String index : indices) {
  public void createIndicesForTopics(Set<String> assignedTopics) {
    for (String index : indicesForTopics(assignedTopics)) {
      if (!indexExists(index)) {
        CreateIndex createIndex = new CreateIndex.Builder(index).build();
        try {
@@ -225,4 +256,17 @@ public class ElasticsearchWriter {
    }
  }

  private Set<String> indicesForTopics(Set<String> assignedTopics) {
    final Set<String> indices = new HashSet<>();
    for (String topic : assignedTopics) {
      final String index = topicToIndexMap.get(topic);
      if (index != null) {
        indices.add(index);
      } else {
        indices.add(topic);
      }
    }
    return indices;
  }

}
+2 −10
Original line number Diff line number Diff line
@@ -34,7 +34,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Set;

import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
@@ -73,10 +72,7 @@ public class Mapping {
   * @param type The type to check.
   * @return Whether the type exists or not.
   */
  public static boolean doesMappingExist(JestClient client, String index, String type, Set<String> mappings) throws IOException {
    if (mappings.contains(index)) {
      return true;
    }
  public static boolean doesMappingExist(JestClient client, String index, String type) throws IOException {
    GetMapping getMapping = new GetMapping.Builder().addIndex(index).addType(type).build();
    JestResult result = client.execute(getMapping);
    JsonObject resultJson = result.getJsonObject().getAsJsonObject(index);
@@ -84,11 +80,7 @@ public class Mapping {
      return false;
    }
    JsonObject typeJson = resultJson.getAsJsonObject(type);
    if (typeJson == null) {
      return false;
    }
    mappings.add(index);
    return true;
    return typeJson != null;
  }

  /**
+0 −41
Original line number Diff line number Diff line
/**
 * Copyright 2016 Confluent Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 **/

package io.confluent.connect.elasticsearch;

public class TopicConfig {
  private String index;
  private boolean ignoreKey;
  private boolean ignoreSchema;

  public TopicConfig(String index, boolean ignoreKey, boolean ignoreSchema) {
    this.index = index;
    this.ignoreKey = ignoreKey;
    this.ignoreSchema = ignoreSchema;
  }

  public String getIndex() {
    return index;
  }

  public boolean ignoreKey() {
    return ignoreKey;
  }

  public boolean ignoreSchema() {
    return ignoreSchema;
  }
}
Loading