Commit ad8ed81d authored by Shikhar Bhushan's avatar Shikhar Bhushan
Browse files

CC-312: fence writes with offset as ES document version

Use version_mode=external and ignore version_conflict_engine_exception errors in the response.
parent d544bc34
Loading
Loading
Loading
Loading
+34 −34
Original line number Diff line number Diff line
@@ -18,23 +18,24 @@ package io.confluent.connect.elasticsearch;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;

import io.confluent.connect.elasticsearch.bulk.BulkClient;
import io.confluent.connect.elasticsearch.bulk.BulkResponse;
import io.searchbox.client.JestClient;
import io.searchbox.core.Bulk;
import io.searchbox.core.BulkResult;
import io.searchbox.core.Index;

public class BulkIndexingClient implements BulkClient<IndexableRecord, Bulk> {

  private static final Logger LOG = LoggerFactory.getLogger(BulkIndexingClient.class);

  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
  private static final Set<String> NON_RETRIABLE_ERROR_TYPES = Collections.singleton("mapper_parse_exception");

  private final JestClient client;

@@ -45,52 +46,51 @@ public class BulkIndexingClient implements BulkClient<IndexableRecord, Bulk> {
  @Override
  public Bulk bulkRequest(List<IndexableRecord> batch) {
    final Bulk.Builder builder = new Bulk.Builder();
    for (IndexableRecord request : batch) {
      builder.addAction(
          new Index.Builder(request.getPayload())
              .index(request.getIndex())
              .type(request.getType())
              .id(request.getId())
              .build()
      );
    for (IndexableRecord record : batch) {
      builder.addAction(record.toIndexRequest());
    }
    return builder.build();
  }

  @Override
  public BulkResponse execute(Bulk bulk) throws IOException {
    return toBulkResponse(client.execute(bulk));
  }
    final BulkResult result = client.execute(bulk);

  private static BulkResponse toBulkResponse(BulkResult result) {
    if (result.isSucceeded()) {
      return BulkResponse.success();
    }

    final List<BulkResult.BulkResultItem> failedItems = result.getFailedItems();
    if (failedItems.isEmpty()) {
      return BulkResponse.failure(true, result.getErrorMessage());
    }

    boolean retriable = true;
    final List<String> errors = new ArrayList<>(failedItems.size());
    for (BulkResult.BulkResultItem failedItem : failedItems) {
      errors.add(failedItem.error);
      retriable &= isRetriableError(failedItem.error);

    final List<Key> versionConflicts = new ArrayList<>();
    final List<String> errors = new ArrayList<>();

    for (BulkResult.BulkResultItem item : result.getItems()) {
      if (item.error != null) {
        final ObjectNode parsedError = (ObjectNode) OBJECT_MAPPER.readTree(item.error);
        final String errorType = parsedError.get("type").asText("");
        if ("version_conflict_engine_exception".equals(errorType)) {
          versionConflicts.add(new Key(item.index, item.type, item.id));
        } else if ("mapper_parse_exception".equals(errorType)) {
          retriable = false;
          errors.add(item.error);
        } else {
          errors.add(item.error);
        }
      }
    return BulkResponse.failure(retriable, errors.toString());
    }

  private static boolean isRetriableError(String error) {
    if (error != null && !error.trim().isEmpty()) {
      try {
        final ObjectNode parsedError = (ObjectNode) OBJECT_MAPPER.readTree(error);
        return !NON_RETRIABLE_ERROR_TYPES.contains(parsedError.get("type").asText());
      } catch (IOException e) {
        return true;
    if (!versionConflicts.isEmpty()) {
      LOG.warn("Ignoring version conflicts for items: {}", versionConflicts);
      if (errors.isEmpty()) {
        // The only errors were version conflicts
        return BulkResponse.success();
      }
    }
    return true;

    final String errorInfo = errors.isEmpty() ? result.getErrorMessage() : errors.toString();

    return BulkResponse.failure(retriable, errorInfo);
  }

}
+1 −1
Original line number Diff line number Diff line
@@ -142,7 +142,7 @@ public class DataConverter {

    String payload = new String(JSON_CONVERTER.fromConnectData(topic, newSchema, newValue), StandardCharsets.UTF_8);

    return new IndexableRecord(index, type, id, payload);
    return new IndexableRecord(new Key(index, type, id), payload, offset);
  }

  // We need to pre process the Kafka Connect schema before converting to JSON as Elasticsearch
+0 −1
Original line number Diff line number Diff line
@@ -37,7 +37,6 @@ import io.searchbox.client.JestResult;
import io.searchbox.indices.CreateIndex;
import io.searchbox.indices.IndicesExists;

// TODO: Use offset as external version to fence requests with lower version.
public class ElasticsearchWriter {
  private static final Logger log = LoggerFactory.getLogger(ElasticsearchWriter.class);

+16 −22
Original line number Diff line number Diff line
@@ -16,34 +16,28 @@

package io.confluent.connect.elasticsearch;

import io.searchbox.core.Index;

public class IndexableRecord {

  private final String index;
  private final String type;
  private final String id;
  private final String payload;
  public final Key key;
  public final String payload;
  public final long version;

  public IndexableRecord(String index, String type, String id, String payload) {
    this.index = index;
    this.type = type;
    this.id = id;
  public IndexableRecord(Key key, String payload, long version) {
    this.key = key;
    this.version = version;
    this.payload = payload;
  }

  public String getIndex() {
    return index;
  }

  public String getType() {
    return type;
  }

  public String getId() {
    return id;
  }

  public String getPayload() {
    return payload;
  public Index toIndexRequest() {
    return new Index.Builder(payload)
        .index(key.index)
        .type(key.type)
        .id(key.id)
        .setParameter("version_type", "external")
        .setParameter("version", version)
        .build();
  }

}
+56 −0
Original line number Diff line number Diff line
/**
 * Copyright 2016 Confluent Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 **/
package io.confluent.connect.elasticsearch;

import java.util.Objects;

public class Key {

  public final String index;
  public final String type;
  public final String id;

  public Key(String index, String type, String id) {
    this.index = index;
    this.type = type;
    this.id = id;
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) {
      return true;
    }
    if (o == null || getClass() != o.getClass()) {
      return false;
    }
    Key that = (Key) o;
    return Objects.equals(index, that.index) &&
           Objects.equals(type, that.type) &&
           Objects.equals(id, that.id);
  }

  @Override
  public int hashCode() {
    return Objects.hash(index, type, id);
  }

  @Override
  public String toString() {
    return String.format("Key{%s/%s/%s}", index, type, id);
  }

}
Loading