Commit d6c7baf6 authored by Shikhar Bhushan's avatar Shikhar Bhushan
Browse files

BulkProcessor refactor

Fixes #14, CC-306, CC-307
parent 3691c0fa
Loading
Loading
Loading
Loading
+96 −0
Original line number Diff line number Diff line
@@ -13,83 +13,84 @@
 * License for the specific language governing permissions and limitations under
 * the License.
 **/
package io.confluent.connect.elasticsearch;

package io.confluent.connect.elasticsearch.internals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;

import io.confluent.connect.elasticsearch.bulk.BulkClient;
import io.confluent.connect.elasticsearch.bulk.BulkResponse;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResultHandler;
import io.searchbox.core.Bulk;
import io.searchbox.core.BulkResult;
import io.searchbox.core.Index;

public class HttpClient implements Client<Response> {
public class BulkIndexingClient implements BulkClient<IndexingRequest, Bulk> {

  private static final Logger log = LoggerFactory.getLogger(HttpClient.class);
  private ObjectMapper objectMapper = new ObjectMapper();
  private final JestClient jestClient;
  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
  private static final Set<String> NON_RETRIABLE_ERROR_TYPES = Collections.singleton("mapper_parse_exception");

  public HttpClient(JestClient jestClient) {
    this.jestClient = jestClient;
  }
  private final JestClient client;

  @Override
  public void execute(RecordBatch batch, Callback<Response> callback) {
    Bulk bulk = constructBulk(batch, callback);
    jestClient.executeAsync(bulk, new CallbackHandler(callback));
  public BulkIndexingClient(JestClient client) {
    this.client = client;
  }

  @Override
  public void close() {
    // We shutdown the JEST client when sink tasks are stopped.
  }

  private Bulk constructBulk(RecordBatch batch, Callback<Response> callback) {
    Bulk.Builder builder = new Bulk.Builder();
    List<ESRequest> requests = batch.requests();
    for (ESRequest request: requests) {
      JsonNode data = null;
      try {
        data = objectMapper.readTree(request.getPayload());
      } catch (IOException e) {
        callback.onFailure(e);
      }
      Index index = new Index.Builder(data.toString())
  public Bulk bulkRequest(List<IndexingRequest> batch) {
    final Bulk.Builder builder = new Bulk.Builder();
    for (IndexingRequest request : batch) {
      builder.addAction(
          new Index.Builder(request.getPayload())
              .index(request.getIndex())
              .type(request.getType())
              .id(request.getId())
          .build();
      builder.addAction(index);
              .build()
      );
    }
    return builder.build();
  }

  private static class CallbackHandler implements JestResultHandler<BulkResult> {
  @Override
  public BulkResponse execute(Bulk bulk) throws IOException {
    return toBulkResponse(client.execute(bulk));
  }

    private Callback<Response> callback;
  private static BulkResponse toBulkResponse(BulkResult result) {
    if (result.isSucceeded()) {
      return BulkResponse.success();
    }

    public CallbackHandler(Callback<Response> callback) {
      this.callback = callback;
    final List<BulkResult.BulkResultItem> failedItems = result.getFailedItems();
    if (failedItems.isEmpty()) {
      return BulkResponse.failure(true, result.getErrorMessage());
    }

    @Override
    public void completed(BulkResult result) {
      log.debug("Request completed with result: {}", result);
      callback.onResponse(new Response(result));
    boolean retriable = true;
    final List<String> errors = new ArrayList<>(failedItems.size());
    for (BulkResult.BulkResultItem failedItem : failedItems) {
      errors.add(failedItem.error);
      retriable &= isRetriableError(failedItem.error);
    }
    return BulkResponse.failure(retriable, errors.toString());
  }

    @Override
    public void failed(Exception e) {
      log.debug("Request failed with exception: {}", e.getMessage());
      callback.onFailure(e);
  private static boolean isRetriableError(String error) {
    if (error != null && !error.trim().isEmpty()) {
      try {
        final ObjectNode parsedError = (ObjectNode) OBJECT_MAPPER.readTree(error);
        return !NON_RETRIABLE_ERROR_TYPES.contains(parsedError.get("type").asText());
      } catch (IOException e) {
        return true;
      }
    }
    return true;
  }

}
+8 −27
Original line number Diff line number Diff line
@@ -33,13 +33,13 @@ import org.apache.kafka.connect.storage.Converter;

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;

import io.confluent.connect.elasticsearch.internals.ESRequest;
import io.searchbox.client.JestClient;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY;
@@ -53,14 +53,6 @@ public class DataConverter {
    JSON_CONVERTER.configure(Collections.singletonMap("schemas.enable", "false"), false);
  }

  /**
   * Convert the key to the string representation.
   *
   * @param key The key of a SinkRecord.
   * @param keySchema The key schema.
   * @return The string representation of the key.
   * @throws ConnectException if the key is null.
   */
  public static String convertKey(Object key, Schema keySchema) {
    if (key == null) {
      throw new ConnectException("Key is used as document id and can not be null.");
@@ -86,26 +78,14 @@ public class DataConverter {
    }
  }

  /**
   * Convert a SinkRecord to an IndexRequest.
   *
   * @param record The SinkRecord to be converted.
   * @param client The client to connect to Elasticsearch.
   * @param ignoreKey Whether to ignore the key during indexing.
   * @param ignoreSchema Whether to ignore the schema during indexing.
   * @param topicConfigs The map of per topic configs.
   * @param mappings The mapping cache.
   * @return The converted IndexRequest.
   */

  public static ESRequest convertRecord(
  public static IndexingRequest convertRecord(
      SinkRecord record,
      String type,
      JestClient client,
      boolean ignoreKey,
      boolean ignoreSchema,
      Map<String, TopicConfig> topicConfigs,
      Set<String> mappings) {
      Set<String> mappingCache) {

    String topic = record.topic();
    int partition = record.kafkaPartition();
@@ -139,9 +119,9 @@ public class DataConverter {
    }

    try {
      if (!topicIgnoreSchema && !mappings.contains(index) && !Mapping.doesMappingExist(client, index, type, mappings)) {
      if (!topicIgnoreSchema && !mappingCache.contains(index) && !Mapping.doesMappingExist(client, index, type, mappingCache)) {
        Mapping.createMapping(client, index, type, valueSchema);
        mappings.add(index);
        mappingCache.add(index);
      }
    } catch (IOException e) {
      // TODO: It is possible that two clients are creating the mapping at the same time and
@@ -160,8 +140,9 @@ public class DataConverter {
      newValue = value;
    }

    byte[] json = JSON_CONVERTER.fromConnectData(topic, newSchema, newValue);
    return new ESRequest(index, type, id, json);
    String payload = new String(JSON_CONVERTER.fromConnectData(topic, newSchema, newValue), StandardCharsets.UTF_8);

    return new IndexingRequest(index, type, id, payload);
  }

  // We need to pre process the Kafka Connect schema before converting to JSON as Elasticsearch
+3 −4
Original line number Diff line number Diff line
@@ -65,9 +65,8 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {

  public static final String MAX_BUFFERED_RECORDS_CONFIG = "max.buffered.records";
  private static final String MAX_BUFFERED_RECORDS_DOC =
      "Approximately the max number of records each task will buffer. This config controls the memory usage for each task. When the number of "
      + "buffered records is larger than this value, the partitions assigned to this task will be paused.";
  private static final long MAX_BUFFERED_RECORDS_DEFAULT = 20000;
      "Approximately the max number of records each task will buffer. This config controls the memory usage for each task.";
  private static final int MAX_BUFFERED_RECORDS_DEFAULT = 20000;
  private static final String MAX_BUFFERED_RECORDS_DISPLAY = "Max Number of Records to Buffer";

  public static final String BATCH_SIZE_CONFIG = "batch.size";
@@ -135,7 +134,7 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
        .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, RETRY_BACKOFF_MS_DEFAULT, Importance.LOW, RETRY_BACKOFF_MS_DOC, CONNECTOR_GROUP, 11, Width.SHORT, RETRY_BACKOFF_MS_DISPLAY)
        .define(MAX_RETRIES_CONFIG, Type.INT, MAX_RETRIES_DEFAULT, Importance.LOW, MAX_RETRIES_DOC, CONNECTOR_GROUP, 12, Width.SHORT, MAX_RETRIES_DISPLAY)
        .define(FLUSH_TIMEOUT_MS_CONFIG, Type.LONG, FLUSH_TIMEOUT_MS_DEFAULT, Importance.LOW, FLUSH_TIMEOUT_MS_DOC, CONNECTOR_GROUP, 13, Width.SHORT, FLUSH_TIMEOUT_MS_DISPLAY)
        .define(MAX_BUFFERED_RECORDS_CONFIG, Type.LONG, MAX_BUFFERED_RECORDS_DEFAULT, Importance.LOW, MAX_BUFFERED_RECORDS_DOC, CONNECTOR_GROUP, 14, Width.SHORT, MAX_BUFFERED_RECORDS_DISPLAY);
        .define(MAX_BUFFERED_RECORDS_CONFIG, Type.INT, MAX_BUFFERED_RECORDS_DEFAULT, Importance.LOW, MAX_BUFFERED_RECORDS_DOC, CONNECTOR_GROUP, 14, Width.SHORT, MAX_BUFFERED_RECORDS_DISPLAY);
  }

  static ConfigDef config = baseConfigDef();
+2 −2
Original line number Diff line number Diff line
@@ -68,7 +68,7 @@ public class ElasticsearchSinkTask extends SinkTask {
      Map<String, TopicConfig> topicConfigs = constructTopicConfig(topicIndex, topicIgnoreKey, topicIgnoreSchema);

      long flushTimeoutMs = config.getLong(ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG);
      long maxBufferedRecords = config.getLong(ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG);
      int maxBufferedRecords = config.getInt(ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG);
      int batchSize = config.getInt(ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG);
      long lingerMs = config.getLong(ElasticsearchSinkConnectorConfig.LINGER_MS_CONFIG);
      int maxInFlightRequests = config.getInt(ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG);
+34 −155
Original line number Diff line number Diff line
@@ -16,8 +16,8 @@

package io.confluent.connect.elasticsearch;

import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,55 +30,27 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import io.confluent.connect.elasticsearch.internals.BulkProcessor;
import io.confluent.connect.elasticsearch.internals.ESRequest;
import io.confluent.connect.elasticsearch.internals.HttpClient;
import io.confluent.connect.elasticsearch.internals.Listener;
import io.confluent.connect.elasticsearch.internals.RecordBatch;
import io.confluent.connect.elasticsearch.internals.Response;
import io.confluent.connect.elasticsearch.bulk.BulkProcessor;
import io.searchbox.action.Action;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
import io.searchbox.indices.CreateIndex;
import io.searchbox.indices.IndicesExists;

/**
 * The ElasticsearchWriter handles connections to Elasticsearch, sending data and flush.
 * Transport client is used to send requests to Elasticsearch cluster. Requests are batched
 * when sending to Elasticsearch. To ensure delivery guarantee and order, we retry in case of
 * failures for a batch.
 *
 * Currently, we only send out requests to Elasticsearch when flush is called, which is not
 * desirable from the latency point of view.
 *
 * TODO: Use offset as external version to fence requests with lower version.
 */
// TODO: Use offset as external version to fence requests with lower version.
public class ElasticsearchWriter {
  private static final Logger log = LoggerFactory.getLogger(ElasticsearchWriter.class);

  private final JestClient client;
  private final BulkProcessor bulkProcessor;
  private final String type;
  private final boolean ignoreKey;
  private final boolean ignoreSchema;
  private final Map<String, TopicConfig> topicConfigs;
  private final long flushTimeoutMs;
  private final long maxBufferedRecords;

  private final Set<String> mappings;
  private final BulkProcessor<IndexingRequest, ?> bulkProcessor;

  /**
   * ElasticsearchWriter constructor
   * @param client The client to connect to Elasticsearch.
   * @param type The type to use when writing to Elasticsearch.
   * @param ignoreKey Whether to ignore key during indexing.
   * @param ignoreSchema Whether to ignore schema during indexing.
   * @param topicConfigs The map of per topic configs.
   * @param flushTimeoutMs The flush timeout.
   * @param maxBufferedRecords The max number of buffered records.
   * @param maxInFlightRequests The max number of inflight requests allowed.
   * @param batchSize Approximately the max number of records each writer will buffer.
   * @param lingerMs The time to wait before sending a batch.
   */
  ElasticsearchWriter(
      JestClient client,
      String type,
@@ -86,28 +58,32 @@ public class ElasticsearchWriter {
      boolean ignoreSchema,
      Map<String, TopicConfig> topicConfigs,
      long flushTimeoutMs,
      long maxBufferedRecords,
      int maxBufferedRecords,
      int maxInFlightRequests,
      int batchSize,
      long lingerMs,
      int maxRetry,
      long retryBackoffMs) {

      int maxRetries,
      long retryBackoffMs
  ) {
    this.client = client;
    this.type = type;
    this.ignoreKey = ignoreKey;
    this.ignoreSchema = ignoreSchema;

    this.topicConfigs = topicConfigs == null ? Collections.<String, TopicConfig>emptyMap() : topicConfigs;

    this.flushTimeoutMs = flushTimeoutMs;
    this.maxBufferedRecords  = maxBufferedRecords;

    // Start the BulkProcessor
    bulkProcessor = new BulkProcessor(new HttpClient(client), maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs, createDefaultListener());

    //Create mapping cache
    mappings = new HashSet<>();

    bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
        new BulkIndexingClient(client),
        maxBufferedRecords,
        maxInFlightRequests,
        batchSize,
        lingerMs,
        maxRetries,
        retryBackoffMs
    );
  }

  public static class Builder {
@@ -117,166 +93,88 @@ public class ElasticsearchWriter {
    private boolean ignoreSchema = false;
    private Map<String, TopicConfig> topicConfigs = new HashMap<>();
    private long flushTimeoutMs;
    private long maxBufferedRecords;
    private int maxBufferedRecords;
    private int maxInFlightRequests;
    private int batchSize;
    private long lingerMs;
    private int maxRetry;
    private long retryBackoffMs;

    /**
     * Constructor of ElasticsearchWriter Builder.
     * @param client The client to connect to Elasticsearch.
     */
    public Builder(JestClient client) {
      this.client = client;
    }

    /**
     * Set the index.
     * @param type The type to use for each index.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setType(String type) {
      this.type = type;
      return this;
    }

    /**
     * Set whether to ignore key during indexing.
     * @param ignoreKey Whether to ignore key.
     * @return an instance of ElasticsearchWriter Builder.
     */

    public Builder setIgnoreKey(boolean ignoreKey) {
      this.ignoreKey = ignoreKey;
      return this;
    }

    /**
     * Set whether to ignore schema during indexing.
     * @param ignoreSchema Whether to ignore key.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setIgnoreSchema(boolean ignoreSchema) {
      this.ignoreSchema = ignoreSchema;
      return this;
    }

    /**
     * Set per topic configurations.
     * @param topicConfigs The map of per topic configuration.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setTopicConfigs(Map<String, TopicConfig> topicConfigs) {
      this.topicConfigs = topicConfigs;
      return this;
    }

    /**
     * Set the flush timeout.
     * @param flushTimeoutMs The flush timeout in milliseconds.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setFlushTimoutMs(long flushTimeoutMs) {
      this.flushTimeoutMs = flushTimeoutMs;
      return this;
    }

    /**
     * Set the max number of records to buffer for each writer.
     * @param maxBufferedRecords The max number of buffered records.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setMaxBufferedRecords(long maxBufferedRecords) {
    public Builder setMaxBufferedRecords(int maxBufferedRecords) {
      this.maxBufferedRecords = maxBufferedRecords;
      return this;
    }

    /**
     * Set the max number of inflight requests.
     * @param maxInFlightRequests The max allowed number of inflight requests.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setMaxInFlightRequests(int maxInFlightRequests) {
      this.maxInFlightRequests = maxInFlightRequests;
      return this;
    }

    /**
     * Set the number of requests to process as a batch when writing.
     * to Elasticsearch.
     * @param batchSize the size of each batch.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setBatchSize(int batchSize) {
      this.batchSize = batchSize;
      return this;
    }

    /**
     * Set the linger time.
     * @param lingerMs The linger time to use in milliseconds.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setLingerMs(long lingerMs) {
      this.lingerMs = lingerMs;
      return this;
    }

    /**
     * Set the max retry for a batch
     * @param maxRetry The number of max retry.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setMaxRetry(int maxRetry) {
      this.maxRetry = maxRetry;
      return this;
    }

    /**
     * Set the retry backoff.
     * @param retryBackoffMs The retry backoff in milliseconds.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setRetryBackoffMs(long retryBackoffMs) {
      this.retryBackoffMs = retryBackoffMs;
      return this;
    }

    /**
     * Build the ElasticsearchWriter.
     * @return an instance of ElasticsearchWriter.
     */
    public ElasticsearchWriter build() {
      return new ElasticsearchWriter(
          client, type, ignoreKey, ignoreSchema, topicConfigs, flushTimeoutMs, maxBufferedRecords, maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs);
          client, type, ignoreKey, ignoreSchema, topicConfigs, flushTimeoutMs, maxBufferedRecords, maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs
      );
    }
  }

  public void write(Collection<SinkRecord> records) {
    if (bulkProcessor.getException() != null) {
      throw new ConnectException("BulkProcessor failed with non-retriable exception", bulkProcessor.getException());
    }
    if (bulkProcessor.getTotalBufferedRecords() + records.size() > maxBufferedRecords) {
      throw new RetriableException("Exceeded max number of buffered records: " + maxBufferedRecords);
    }
    for (SinkRecord record : records) {
      ESRequest request = DataConverter.convertRecord(record, type, client, ignoreKey, ignoreSchema, topicConfigs, mappings);
      IndexingRequest request = DataConverter.convertRecord(record, type, client, ignoreKey, ignoreSchema, topicConfigs, mappings);
      bulkProcessor.add(request);
    }
  }

  public void flush() {
    try {
      if (!bulkProcessor.flush(flushTimeoutMs)) {
        throw new ConnectException("Cannot finish flush messages within " + flushTimeoutMs);
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    } catch (Throwable t) {
      throw new ConnectException("Flush failed with non retriable exception.", t);
    }
    bulkProcessor.flush(flushTimeoutMs);
  }

  public void start() {
@@ -284,14 +182,13 @@ public class ElasticsearchWriter {
  }

  public void stop() {
    bulkProcessor.stop();
    try {
      bulkProcessor.awaitStop(flushTimeoutMs);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    } catch (Throwable t) {
      throw new ConnectException("Close failed with non retriable exception", t);
      bulkProcessor.flush(flushTimeoutMs);
    } catch (Exception e) {
      log.warn("Failed to flush during stop", e);
    }
    bulkProcessor.stop();
    bulkProcessor.awaitStop(flushTimeoutMs);
  }

  private boolean indexExists(String index) {
@@ -329,22 +226,4 @@ public class ElasticsearchWriter {
    }
  }

  private Listener createDefaultListener() {
    return new Listener() {
      @Override
      public void beforeBulk(long executionId, RecordBatch batch) {

      }

      @Override
      public void afterBulk(long executionId, RecordBatch batch, Response response) {

      }

      @Override
      public void afterBulk(long executionId, RecordBatch batch, Throwable failure) {

      }
    };
  }
}
Loading