Commit aa9fe36d authored by Shikhar Bhushan's avatar Shikhar Bhushan Committed by GitHub
Browse files

Merge pull request #30 from confluentinc/version-fencing

CC-312: fence writes with offset as ES document version
parents 7a024455 7fdccb20
Loading
Loading
Loading
Loading
+34 −34
Original line number Diff line number Diff line
@@ -18,23 +18,24 @@ package io.confluent.connect.elasticsearch;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;

import io.confluent.connect.elasticsearch.bulk.BulkClient;
import io.confluent.connect.elasticsearch.bulk.BulkResponse;
import io.searchbox.client.JestClient;
import io.searchbox.core.Bulk;
import io.searchbox.core.BulkResult;
import io.searchbox.core.Index;

public class BulkIndexingClient implements BulkClient<IndexableRecord, Bulk> {

  private static final Logger LOG = LoggerFactory.getLogger(BulkIndexingClient.class);

  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
  private static final Set<String> NON_RETRIABLE_ERROR_TYPES = Collections.singleton("mapper_parse_exception");

  private final JestClient client;

@@ -45,52 +46,51 @@ public class BulkIndexingClient implements BulkClient<IndexableRecord, Bulk> {
  @Override
  public Bulk bulkRequest(List<IndexableRecord> batch) {
    final Bulk.Builder builder = new Bulk.Builder();
    for (IndexableRecord request : batch) {
      builder.addAction(
          new Index.Builder(request.getPayload())
              .index(request.getIndex())
              .type(request.getType())
              .id(request.getId())
              .build()
      );
    for (IndexableRecord record : batch) {
      builder.addAction(record.toIndexRequest());
    }
    return builder.build();
  }

  @Override
  public BulkResponse execute(Bulk bulk) throws IOException {
    return toBulkResponse(client.execute(bulk));
  }
    final BulkResult result = client.execute(bulk);

  private static BulkResponse toBulkResponse(BulkResult result) {
    if (result.isSucceeded()) {
      return BulkResponse.success();
    }

    final List<BulkResult.BulkResultItem> failedItems = result.getFailedItems();
    if (failedItems.isEmpty()) {
      return BulkResponse.failure(true, result.getErrorMessage());
    }

    boolean retriable = true;
    final List<String> errors = new ArrayList<>(failedItems.size());
    for (BulkResult.BulkResultItem failedItem : failedItems) {
      errors.add(failedItem.error);
      retriable &= isRetriableError(failedItem.error);

    final List<Key> versionConflicts = new ArrayList<>();
    final List<String> errors = new ArrayList<>();

    for (BulkResult.BulkResultItem item : result.getItems()) {
      if (item.error != null) {
        final ObjectNode parsedError = (ObjectNode) OBJECT_MAPPER.readTree(item.error);
        final String errorType = parsedError.get("type").asText("");
        if ("version_conflict_engine_exception".equals(errorType)) {
          versionConflicts.add(new Key(item.index, item.type, item.id));
        } else if ("mapper_parse_exception".equals(errorType)) {
          retriable = false;
          errors.add(item.error);
        } else {
          errors.add(item.error);
        }
      }
    return BulkResponse.failure(retriable, errors.toString());
    }

  private static boolean isRetriableError(String error) {
    if (error != null && !error.trim().isEmpty()) {
      try {
        final ObjectNode parsedError = (ObjectNode) OBJECT_MAPPER.readTree(error);
        return !NON_RETRIABLE_ERROR_TYPES.contains(parsedError.get("type").asText());
      } catch (IOException e) {
        return true;
    if (!versionConflicts.isEmpty()) {
      LOG.warn("Ignoring version conflicts for items: {}", versionConflicts);
      if (errors.isEmpty()) {
        // The only errors were version conflicts
        return BulkResponse.success();
      }
    }
    return true;

    final String errorInfo = errors.isEmpty() ? result.getErrorMessage() : errors.toString();

    return BulkResponse.failure(retriable, errorInfo);
  }

}
+15 −64
Original line number Diff line number Diff line
@@ -31,16 +31,12 @@ import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.Converter;

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;

import io.searchbox.client.JestClient;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_VALUE;
@@ -53,7 +49,7 @@ public class DataConverter {
    JSON_CONVERTER.configure(Collections.singletonMap("schemas.enable", "false"), false);
  }

  public static String convertKey(Object key, Schema keySchema) {
  private static String convertKey(Schema keySchema, Object key) {
    if (key == null) {
      throw new ConnectException("Key is used as document id and can not be null.");
    }
@@ -78,71 +74,26 @@ public class DataConverter {
    }
  }

  public static IndexableRecord convertRecord(
      SinkRecord record,
      String type,
      JestClient client,
      boolean ignoreKey,
      boolean ignoreSchema,
      Map<String, TopicConfig> topicConfigs,
      Set<String> mappingCache) {

    String topic = record.topic();
    int partition = record.kafkaPartition();
    long offset = record.kafkaOffset();

    Object key = record.key();
    Schema keySchema = record.keySchema();
    Object value = record.value();
    Schema valueSchema = record.valueSchema();

    String index;
    String id;
    boolean topicIgnoreKey;
    boolean topicIgnoreSchema;

    if (topicConfigs.containsKey(topic)) {
      TopicConfig topicConfig = topicConfigs.get(topic);
      index = topicConfig.getIndex();
      topicIgnoreKey = topicConfig.ignoreKey();
      topicIgnoreSchema = topicConfig.ignoreSchema();
    } else {
      index = topic;
      topicIgnoreKey = ignoreKey;
      topicIgnoreSchema = ignoreSchema;
    }

    if (topicIgnoreKey) {
      id = topic + "+" + String.valueOf(partition) + "+" + String.valueOf(offset);
  public static IndexableRecord convertRecord(SinkRecord record, String index, String type, boolean ignoreKey, boolean ignoreSchema) {
    final String id;
    if (ignoreKey) {
      id = record.topic() + "+" + String.valueOf((int) record.kafkaPartition()) + "+" + String.valueOf(record.kafkaOffset());
    } else {
      id = DataConverter.convertKey(key, keySchema);
    }

    try {
      if (!topicIgnoreSchema && !mappingCache.contains(index) && !Mapping.doesMappingExist(client, index, type, mappingCache)) {
        Mapping.createMapping(client, index, type, valueSchema);
        mappingCache.add(index);
      }
    } catch (IOException e) {
      // TODO: It is possible that two clients are creating the mapping at the same time and
      // one request to create mapping may fail. In this case, we should allow the task to
      // proceed instead of throw the exception.
      throw new ConnectException("Cannot create mapping:", e);
      id = DataConverter.convertKey(record.keySchema(), record.key());
    }

    Schema newSchema;
    Object newValue;
    if (!topicIgnoreSchema) {
      newSchema = preProcessSchema(valueSchema);
      newValue = preProcessValue(value, valueSchema, newSchema);
    final Schema schema;
    final Object value;
    if (!ignoreSchema) {
      schema = preProcessSchema(record.valueSchema());
      value = preProcessValue(record.value(), record.valueSchema(), schema);
    } else {
      newSchema = valueSchema;
      newValue = value;
      schema = record.valueSchema();
      value = record.value();
    }

    String payload = new String(JSON_CONVERTER.fromConnectData(topic, newSchema, newValue), StandardCharsets.UTF_8);

    return new IndexableRecord(index, type, id, payload);
    final String payload = new String(JSON_CONVERTER.fromConnectData(record.topic(), schema, value), StandardCharsets.UTF_8);
    return new IndexableRecord(new Key(index, type, id), payload, record.kafkaOffset());
  }

  // We need to pre process the Kafka Connect schema before converting to JSON as Elasticsearch
+7 −22
Original line number Diff line number Diff line
@@ -62,10 +62,9 @@ public class ElasticsearchSinkTask extends SinkTask {
      boolean ignoreKey = config.getBoolean(ElasticsearchSinkConnectorConfig.KEY_IGNORE_CONFIG);
      boolean ignoreSchema = config.getBoolean(ElasticsearchSinkConnectorConfig.SCHEMA_IGNORE_CONFIG);

      List<String> topicIndex = config.getList(ElasticsearchSinkConnectorConfig.TOPIC_INDEX_MAP_CONFIG);
      List<String> topicIgnoreKey = config.getList(ElasticsearchSinkConnectorConfig.TOPIC_KEY_IGNORE_CONFIG);
      List<String> topicIgnoreSchema = config.getList(ElasticsearchSinkConnectorConfig.TOPIC_SCHEMA_IGNORE_CONFIG);
      Map<String, TopicConfig> topicConfigs = constructTopicConfig(topicIndex, topicIgnoreKey, topicIgnoreSchema);
      Map<String, String> topicToIndexMap = parseMapConfig(config.getList(ElasticsearchSinkConnectorConfig.TOPIC_INDEX_MAP_CONFIG));
      Set<String> topicIgnoreKey = new HashSet<>(config.getList(ElasticsearchSinkConnectorConfig.TOPIC_KEY_IGNORE_CONFIG));
      Set<String> topicIgnoreSchema =  new HashSet<>(config.getList(ElasticsearchSinkConnectorConfig.TOPIC_SCHEMA_IGNORE_CONFIG));

      long flushTimeoutMs = config.getLong(ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG);
      int maxBufferedRecords = config.getInt(ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG);
@@ -86,9 +85,9 @@ public class ElasticsearchSinkTask extends SinkTask {

      ElasticsearchWriter.Builder builder = new ElasticsearchWriter.Builder(this.client)
          .setType(type)
          .setIgnoreKey(ignoreKey)
          .setIgnoreSchema(ignoreSchema)
          .setTopicConfigs(topicConfigs)
          .setIgnoreKey(ignoreKey, topicIgnoreKey)
          .setIgnoreSchema(ignoreSchema, topicIgnoreSchema)
          .setTopicToIndexMap(topicToIndexMap)
          .setFlushTimoutMs(flushTimeoutMs)
          .setMaxBufferedRecords(maxBufferedRecords)
          .setMaxInFlightRequests(maxInFlightRequests)
@@ -111,7 +110,7 @@ public class ElasticsearchSinkTask extends SinkTask {
    for (TopicPartition tp : partitions) {
      topics.add(tp.topic());
    }
    writer.createIndices(topics);
    writer.createIndicesForTopics(topics);
  }

  @Override
@@ -153,18 +152,4 @@ public class ElasticsearchSinkTask extends SinkTask {
    return map;
  }

  private Map<String, TopicConfig> constructTopicConfig(List<String> topicType, List<String> topicIgnoreKey, List<String> topicIgnoreSchema) {
    Map<String, TopicConfig> topicConfigMap = new HashMap<>();
    Map<String, String> topicTypeMap = parseMapConfig(topicType);
    Set<String> topicIgnoreKeySet = new HashSet<>(topicIgnoreKey);
    Set<String> topicIgnoreSchemaSet = new HashSet<>(topicIgnoreSchema);

    for (String topic: topicTypeMap.keySet()) {
      String type = topicTypeMap.get(topic);
      TopicConfig topicConfig = new TopicConfig(type, topicIgnoreKeySet.contains(topic), topicIgnoreSchemaSet.contains(topic));
      topicConfigMap.put(topic, topicConfig);
    }
    return topicConfigMap;
  }

}
+69 −26
Original line number Diff line number Diff line
@@ -37,26 +37,29 @@ import io.searchbox.client.JestResult;
import io.searchbox.indices.CreateIndex;
import io.searchbox.indices.IndicesExists;

// TODO: Use offset as external version to fence requests with lower version.
public class ElasticsearchWriter {
  private static final Logger log = LoggerFactory.getLogger(ElasticsearchWriter.class);

  private final JestClient client;
  private final String type;
  private final boolean ignoreKey;
  private final Set<String> ignoreKeyTopics;
  private final boolean ignoreSchema;
  private final Map<String, TopicConfig> topicConfigs;
  private final Set<String> ignoreSchemaTopics;
  private final Map<String, String> topicToIndexMap;
  private final long flushTimeoutMs;

  private final Set<String> mappings;
  private final BulkProcessor<IndexableRecord, ?> bulkProcessor;

  private final Set<String> existingMappings;

  ElasticsearchWriter(
      JestClient client,
      String type,
      boolean ignoreKey,
      Set<String> ignoreKeyTopics,
      boolean ignoreSchema,
      Map<String, TopicConfig> topicConfigs,
      Set<String> ignoreSchemaTopics,
      Map<String, String> topicToIndexMap,
      long flushTimeoutMs,
      int maxBufferedRecords,
      int maxInFlightRequests,
@@ -68,12 +71,12 @@ public class ElasticsearchWriter {
    this.client = client;
    this.type = type;
    this.ignoreKey = ignoreKey;
    this.ignoreKeyTopics = ignoreKeyTopics;
    this.ignoreSchema = ignoreSchema;
    this.topicConfigs = topicConfigs == null ? Collections.<String, TopicConfig>emptyMap() : topicConfigs;
    this.ignoreSchemaTopics = ignoreSchemaTopics;
    this.topicToIndexMap = topicToIndexMap;
    this.flushTimeoutMs = flushTimeoutMs;

    mappings = new HashSet<>();

    bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
        new BulkIndexingClient(client),
@@ -84,14 +87,18 @@ public class ElasticsearchWriter {
        maxRetries,
        retryBackoffMs
    );

    existingMappings = new HashSet<>();
  }

  public static class Builder {
    private final JestClient client;
    private String type;
    private boolean ignoreKey = false;
    private Set<String> ignoreKeyTopics = Collections.emptySet();
    private boolean ignoreSchema = false;
    private Map<String, TopicConfig> topicConfigs = new HashMap<>();
    private Set<String> ignoreSchemaTopics = Collections.emptySet();
    private Map<String, String> topicToIndexMap = new HashMap<>();
    private long flushTimeoutMs;
    private int maxBufferedRecords;
    private int maxInFlightRequests;
@@ -109,18 +116,20 @@ public class ElasticsearchWriter {
      return this;
    }

    public Builder setIgnoreKey(boolean ignoreKey) {
    public Builder setIgnoreKey(boolean ignoreKey, Set<String> ignoreKeyTopics) {
      this.ignoreKey = ignoreKey;
      this.ignoreKeyTopics = ignoreKeyTopics;
      return this;
    }

    public Builder setIgnoreSchema(boolean ignoreSchema) {
    public Builder setIgnoreSchema(boolean ignoreSchema, Set<String> ignoreSchemaTopics) {
      this.ignoreSchema = ignoreSchema;
      this.ignoreSchemaTopics = ignoreSchemaTopics;
      return this;
    }

    public Builder setTopicConfigs(Map<String, TopicConfig> topicConfigs) {
      this.topicConfigs = topicConfigs;
    public Builder setTopicToIndexMap(Map<String, String> topicToIndexMap) {
      this.topicToIndexMap = topicToIndexMap;
      return this;
    }

@@ -161,14 +170,44 @@ public class ElasticsearchWriter {

    public ElasticsearchWriter build() {
      return new ElasticsearchWriter(
          client, type, ignoreKey, ignoreSchema, topicConfigs, flushTimeoutMs, maxBufferedRecords, maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs
          client,
          type,
          ignoreKey,
          ignoreKeyTopics,
          ignoreSchema,
          ignoreSchemaTopics,
          topicToIndexMap,
          flushTimeoutMs,
          maxBufferedRecords,
          maxInFlightRequests,
          batchSize,
          lingerMs,
          maxRetry,
          retryBackoffMs
      );
    }
  }

  public void write(Collection<SinkRecord> records) {
    for (SinkRecord sinkRecord : records) {
      final IndexableRecord indexableRecord = DataConverter.convertRecord(sinkRecord, type, client, ignoreKey, ignoreSchema, topicConfigs, mappings);
      final String index = sinkRecord.topic();
      final boolean ignoreKey = ignoreKeyTopics.contains(sinkRecord.topic()) || this.ignoreKey;
      final boolean ignoreSchema = ignoreSchemaTopics.contains(sinkRecord.topic()) || this.ignoreSchema;

      if (!ignoreSchema && !existingMappings.contains(index)) {
        try {
          if (!Mapping.doesMappingExist(client, index, type)) {
            Mapping.createMapping(client, index, type, sinkRecord.valueSchema());
          }
        } catch (IOException e) {
          // FIXME: concurrent tasks could attempt to create the mapping and one of the requests may fail
          throw new ConnectException("Failed to initialize mapping for index: " + index, e);
        }
        existingMappings.add(index);
      }

      final IndexableRecord indexableRecord = DataConverter.convertRecord(sinkRecord, index, type, ignoreKey, ignoreSchema);

      bulkProcessor.add(indexableRecord, flushTimeoutMs);
    }
  }
@@ -201,17 +240,8 @@ public class ElasticsearchWriter {
    }
  }

  public void createIndices(Set<String> assignedTopics) {
    Set<String> indices = new HashSet<>();
    for (String topic : assignedTopics) {
      final TopicConfig topicConfig = topicConfigs.get(topic);
      if (topicConfig != null) {
        indices.add(topicConfig.getIndex());
      } else {
        indices.add(topic);
      }
    }
    for (String index : indices) {
  public void createIndicesForTopics(Set<String> assignedTopics) {
    for (String index : indicesForTopics(assignedTopics)) {
      if (!indexExists(index)) {
        CreateIndex createIndex = new CreateIndex.Builder(index).build();
        try {
@@ -226,4 +256,17 @@ public class ElasticsearchWriter {
    }
  }

  private Set<String> indicesForTopics(Set<String> assignedTopics) {
    final Set<String> indices = new HashSet<>();
    for (String topic : assignedTopics) {
      final String index = topicToIndexMap.get(topic);
      if (index != null) {
        indices.add(index);
      } else {
        indices.add(topic);
      }
    }
    return indices;
  }

}
+16 −22
Original line number Diff line number Diff line
@@ -16,34 +16,28 @@

package io.confluent.connect.elasticsearch;

import io.searchbox.core.Index;

public class IndexableRecord {

  private final String index;
  private final String type;
  private final String id;
  private final String payload;
  public final Key key;
  public final String payload;
  public final long version;

  public IndexableRecord(String index, String type, String id, String payload) {
    this.index = index;
    this.type = type;
    this.id = id;
  public IndexableRecord(Key key, String payload, long version) {
    this.key = key;
    this.version = version;
    this.payload = payload;
  }

  public String getIndex() {
    return index;
  }

  public String getType() {
    return type;
  }

  public String getId() {
    return id;
  }

  public String getPayload() {
    return payload;
  public Index toIndexRequest() {
    return new Index.Builder(payload)
        .index(key.index)
        .type(key.type)
        .id(key.id)
        .setParameter("version_type", "external")
        .setParameter("version", version)
        .build();
  }

}
Loading