Commit 83be4371 authored by Liquan Pei's avatar Liquan Pei
Browse files

Replace BulkProcessor

parent f67a013c
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -12,7 +12,7 @@
        <confluent.version>3.0.0-SNAPSHOT</confluent.version>
        <kafka.version>0.10.1.0-SNAPSHOT</kafka.version>
        <junit.version>4.12</junit.version>
        <es.version>2.2.1</es.version>
        <es.version>2.3.3</es.version>
        <lucene.version>5.3.1</lucene.version>
        <slf4j.version>1.7.5</slf4j.version>
        <jna.version>4.2.1</jna.version>
+4 −3
Original line number Diff line number Diff line
@@ -29,7 +29,6 @@ import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.Converter;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,6 +40,8 @@ import java.util.Collection;
import java.util.Map;
import java.util.Set;

import io.confluent.connect.elasticsearch.internals.ESRequest;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_VALUE;

@@ -96,7 +97,7 @@ public class DataConverter {
   * @return The converted IndexRequest.
   */

  public static IndexRequest convertRecord(
  public static ESRequest convertRecord(
      SinkRecord record,
      String type,
      Client client,
@@ -160,7 +161,7 @@ public class DataConverter {
    }

    byte[] json = converter.fromConnectData(topic, newSchema, newValue);
    return new IndexRequest(index, type, id).source(json);
    return new ESRequest(index, type, id, json);
  }

  // We need to pre process the Kafka Connect schema before converting to JSON as Elasticsearch
+44 −10
Original line number Diff line number Diff line
@@ -72,9 +72,38 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {

  public static final String BATCH_SIZE_CONFIG = "batch.size";
  private static final String BATCH_SIZE_DOC = "The number of requests to process as a batch when writing to Elasticsearch.";
  private static final long BATCH_SIZE_DEFAULT = 10000;
  private static final int BATCH_SIZE_DEFAULT = 10000;
  private static final String BATCH_SIZE_DISPLAY = "Batch Size";

  public static final String LINGER_MS_CONFIG = "linger.ms";
  private static final String LINGER_MS_DOC =
      "The task groups together any records that arrive in between request transmissions into a single batched request. "
      + "Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the "
      + "tasks may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount "
      + "of artificial delay. Rather than immediately sending out a record the task will wait for up to the given delay to allow other "
      + "records to be sent so that the sends can be batched together.";
  private static final long LINGER_MS_DEFAULT = 1;
  private static final String LINGER_MS_DISPLAY = "Linger (ms)";

  public static final String MAX_IN_FLIGHT_REQUESTS_CONFIG = "max.in.flight.requests";
  private static final String MAX_IN_FLIGHT_REQUESTS_DOC =
      "The maximum number of incomplete batches each task will send before blocking. Note that if this is set to be greater "
      + "than 1 and there are failed sends, there is a risk of message re-ordering due to retries";
  private static final int MAX_IN_FLIGHT_REQUESTS_DEFAULT = 5;
  private static final String MAX_IN_FLIGHT_REQUESTS_DISPLAY = "Max in Flight Requests";

  public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
  private static final String RETRY_BACKOFF_MS_DOC =
      "The amount of time to wait before attempting to retry a failed batch. "
      + "This avoids repeatedly sending requests in a tight loop under some failure scenarios.";
  private static final long RETRY_BACKOFF_MS_DEFAULT = 100L;
  private static final String RETRY_BACKOFF_MS_DISPLAY = "Retry Backoff (ms)";

  public static final String MAX_RETRY_CONFIG = "max.retry";
  private static final String MAX_RETRY_DOC = "The max allowed number of retries. Allowing retries will potentially change the ordering of records.";
  private static final int MAX_RETRY_DEFAULT = 5;
  private static final String MAX_RETRY_DISPLAY = "Max Retry";

  public static final String SCHEMA_IGNORE_CONFIG = "schema.ignore";
  private static final String SCHEMA_IGNORE_DOC =
      "Whether to ignore schemas during indexing. When this is set to true, the schema in `SinkRecord` will be ignored and Elasticsearch will infer the mapping from data. "
@@ -92,15 +121,20 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
    return new ConfigDef()
        .define(TRANSPORT_ADDRESSES_CONFIG, Type.LIST, Importance.HIGH, TRANSPORT_ADDRESSES_DOC, ELASTICSEARCH_GROUP, 1, Width.LONG, TRANSPORT_ADDRESSES_DISPLAY)
        .define(TYPE_NAME_CONFIG, Type.STRING, Importance.HIGH, TYPE_NAME_DOC, ELASTICSEARCH_GROUP, 2, Width.SHORT, TYPE_NAME_DISPLAY)
        .define(KEY_IGNORE_CONFIG, Type.BOOLEAN, KEY_IGNORE_DEFAULT, Importance.HIGH, KEY_IGNORE_DOC, CONNECTOR_GROUP, 1, Width.SHORT, KEY_IGNORE_DISPLAY)
        .define(FLUSH_TIMEOUT_MS_CONFIG, Type.LONG, FLUSH_TIMEOUT_MS_DEFAULT, Importance.MEDIUM, FLUSH_TIMEOUT_MS_DOC, CONNECTOR_GROUP, 2, Width.SHORT, FLUSH_TIMEOUT_MS_DISPLAY)
        .define(MAX_BUFFERED_RECORDS_CONFIG, Type.LONG, MAX_BUFFERED_RECORDS_DEFAULT, Importance.MEDIUM, MAX_BUFFERED_RECORDS_DOC, CONNECTOR_GROUP, 3, Width.SHORT, MAX_BUFFERED_RECORDS_DISPLAY)
        .define(BATCH_SIZE_CONFIG, Type.LONG, BATCH_SIZE_DEFAULT, Importance.MEDIUM, BATCH_SIZE_DOC, CONNECTOR_GROUP, 4, Width.SHORT, BATCH_SIZE_DISPLAY)
        .define(TOPIC_INDEX_MAP_CONFIG, Type.LIST, TOPIC_INDEX_MAP_DEFAULT, Importance.LOW,
                TOPIC_INDEX_MAP_DOC, CONNECTOR_GROUP, 5, Width.LONG, TOPIC_INDEX_MAP_DISPLAY)
        .define(TOPIC_KEY_IGNORE_CONFIG, Type.LIST, TOPIC_KEY_IGNORE_DEFAULT, Importance.LOW, TOPIC_KEY_IGNORE_DOC, CONNECTOR_GROUP, 6, Width.LONG, TOPIC_KEY_IGNORE_DISPLAY)
        .define(SCHEMA_IGNORE_CONFIG, Type.BOOLEAN, SCHEMA_IGNORE_DEFAULT, Importance.LOW, SCHEMA_IGNORE_DOC, CONNECTOR_GROUP, 7, Width.SHORT, SCHEMA_IGNORE_DISPLAY)
        .define(TOPIC_SCHEMA_IGNORE_CONFIG, Type.LIST, TOPIC_SCHEMA_IGNORE_DEFAULT, Importance.LOW, TOPIC_SCHEMA_IGNORE_DOC, CONNECTOR_GROUP, 8, Width.LONG, TOPIC_SCHEMA_IGNORE_DISPLAY);
        .define(KEY_IGNORE_CONFIG, Type.BOOLEAN, KEY_IGNORE_DEFAULT, Importance.HIGH, KEY_IGNORE_DOC, CONNECTOR_GROUP, 3, Width.SHORT, KEY_IGNORE_DISPLAY)
        .define(BATCH_SIZE_CONFIG, Type.INT, BATCH_SIZE_DEFAULT, Importance.MEDIUM, BATCH_SIZE_DOC, CONNECTOR_GROUP, 4, Width.SHORT, BATCH_SIZE_DISPLAY)
        .define(MAX_IN_FLIGHT_REQUESTS_CONFIG, Type.INT, MAX_IN_FLIGHT_REQUESTS_DEFAULT, Importance.MEDIUM,
                MAX_IN_FLIGHT_REQUESTS_DOC, CONNECTOR_GROUP, 5, Width.SHORT,
                MAX_IN_FLIGHT_REQUESTS_DISPLAY)
        .define(TOPIC_INDEX_MAP_CONFIG, Type.LIST, TOPIC_INDEX_MAP_DEFAULT, Importance.LOW, TOPIC_INDEX_MAP_DOC, CONNECTOR_GROUP, 6, Width.LONG, TOPIC_INDEX_MAP_DISPLAY)
        .define(TOPIC_KEY_IGNORE_CONFIG, Type.LIST, TOPIC_KEY_IGNORE_DEFAULT, Importance.LOW, TOPIC_KEY_IGNORE_DOC, CONNECTOR_GROUP, 7, Width.LONG, TOPIC_KEY_IGNORE_DISPLAY)
        .define(SCHEMA_IGNORE_CONFIG, Type.BOOLEAN, SCHEMA_IGNORE_DEFAULT, Importance.LOW, SCHEMA_IGNORE_DOC, CONNECTOR_GROUP, 8, Width.SHORT, SCHEMA_IGNORE_DISPLAY)
        .define(TOPIC_SCHEMA_IGNORE_CONFIG, Type.LIST, TOPIC_SCHEMA_IGNORE_DEFAULT, Importance.LOW, TOPIC_SCHEMA_IGNORE_DOC, CONNECTOR_GROUP, 9, Width.LONG, TOPIC_SCHEMA_IGNORE_DISPLAY)
        .define(LINGER_MS_CONFIG, Type.LONG, LINGER_MS_DEFAULT, Importance.LOW, LINGER_MS_DOC, CONNECTOR_GROUP, 10, Width.SHORT, LINGER_MS_DISPLAY)
        .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, RETRY_BACKOFF_MS_DEFAULT, Importance.LOW, RETRY_BACKOFF_MS_DOC, CONNECTOR_GROUP, 11, Width.SHORT, RETRY_BACKOFF_MS_DISPLAY)
        .define(MAX_RETRY_CONFIG, Type.INT, MAX_RETRY_DEFAULT, Importance.LOW, MAX_RETRY_DOC, CONNECTOR_GROUP, 12, Width.SHORT, MAX_RETRY_DISPLAY)
        .define(FLUSH_TIMEOUT_MS_CONFIG, Type.LONG, FLUSH_TIMEOUT_MS_DEFAULT, Importance.LOW, FLUSH_TIMEOUT_MS_DOC, CONNECTOR_GROUP, 13, Width.SHORT, FLUSH_TIMEOUT_MS_DISPLAY)
        .define(MAX_BUFFERED_RECORDS_CONFIG, Type.LONG, MAX_BUFFERED_RECORDS_DEFAULT, Importance.LOW, MAX_BUFFERED_RECORDS_DOC, CONNECTOR_GROUP, 14, Width.SHORT, MAX_BUFFERED_RECORDS_DISPLAY);
  }

  static ConfigDef config = baseConfigDef();
+9 −1
Original line number Diff line number Diff line
@@ -83,7 +83,11 @@ public class ElasticsearchSinkTask extends SinkTask {

      long flushTimeoutMs = config.getLong(ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG);
      long maxBufferedRecords = config.getLong(ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG);
      long batchSize = config.getLong(ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG);
      int batchSize = config.getInt(ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG);
      long lingerMs = config.getLong(ElasticsearchSinkConnectorConfig.LINGER_MS_CONFIG);
      int maxInFlightRequests = config.getInt(ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG);
      long retryBackoffMs = config.getLong(ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG);
      int maxRetry = config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRY_CONFIG);

      if (client != null) {
        this.client = client;
@@ -105,7 +109,11 @@ public class ElasticsearchSinkTask extends SinkTask {
          .setTopicConfigs(topicConfigs)
          .setFlushTimoutMs(flushTimeoutMs)
          .setMaxBufferedRecords(maxBufferedRecords)
          .setMaxInFlightRequests(maxInFlightRequests)
          .setBatchSize(batchSize)
          .setLingerMs(lingerMs)
          .setRetryBackoffMs(retryBackoffMs)
          .setMaxRetry(maxRetry)
          .setContext(context)
          .setConverter(converter);

+96 −139
Original line number Diff line number Diff line
@@ -21,31 +21,22 @@ import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.mapper.MapperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import io.confluent.connect.elasticsearch.internals.BulkProcessor;
import io.confluent.connect.elasticsearch.internals.ESRequest;
import io.confluent.connect.elasticsearch.internals.ESResponse;
import io.confluent.connect.elasticsearch.internals.Listener;
import io.confluent.connect.elasticsearch.internals.RecordBatch;

/**
 * The ElasticsearchWriter handles connections to Elasticsearch, sending data and flush.
@@ -56,8 +47,6 @@ import java.util.concurrent.TimeUnit;
 * Currently, we only send out requests to Elasticsearch when flush is called, which is not
 * desirable from the latency point of view.
 *
 * TODO: Replace the Elasticsearch BulkProcessor with our own processor to handle batching and retry.
 *
 * TODO: Use REST instead of transport client.
 *
 * TODO: Use offset as external version to fence requests with lower version.
@@ -69,23 +58,13 @@ public class ElasticsearchWriter {

  private final Client client;
  private final BulkProcessor bulkProcessor;
  private static final int CONCURRENT_REQUESTS = 1;
  private final Semaphore semaphore = new Semaphore(CONCURRENT_REQUESTS);
  private final Queue<SinkRecord> buffer;
  private BulkRequest currentBulkRequest;

  private final String type;
  private final boolean ignoreKey;
  private final boolean ignoreSchema;
  private final Map<String, TopicConfig> topicConfigs;
  private final long flushTimeoutMs;
  private final long maxBufferedRecords;
  private final long batchSize;

  private final SinkTaskContext context;
  private final Set<TopicPartition> assignment;
  private static final Class<? extends Throwable> NON_RETRIABLE_EXCEPTION_CLASS = MapperException.class;
  private boolean canRetry;
  private final Set<String> mappings;

  /**
@@ -97,7 +76,9 @@ public class ElasticsearchWriter {
   * @param topicConfigs The map of per topic configs.
   * @param flushTimeoutMs The flush timeout.
   * @param maxBufferedRecords The max number of buffered records.
   * @param maxInFlightRequests The max number of inflight requests allowed.
   * @param batchSize Approximately the max number of records each writer will buffer.
   * @param lingerMs The time to wait before sending a batch.
   * @param context The SinkTaskContext.
   * @param mock Whether to use mock Elasticsearch client.
   */
@@ -109,7 +90,11 @@ public class ElasticsearchWriter {
      Map<String, TopicConfig> topicConfigs,
      long flushTimeoutMs,
      long maxBufferedRecords,
      long batchSize,
      int maxInFlightRequests,
      int batchSize,
      long lingerMs,
      int maxRetry,
      long retryBackoffMs,
      SinkTaskContext context,
      Converter converter,
      boolean mock) {
@@ -127,76 +112,20 @@ public class ElasticsearchWriter {

    this.flushTimeoutMs = flushTimeoutMs;
    this.maxBufferedRecords  = maxBufferedRecords;
    this.batchSize = batchSize;

    this.context = context;
    this.assignment = context.assignment();
    this.canRetry = true;

    this.currentBulkRequest = new BulkRequest();

    // create index if needed.
    if (!mock) {
      createIndices(topicConfigs);
    }

    // create the buffer
    buffer = new LinkedList<>();

    // Config the JsonConverter
    this.converter = converter;

    // Create the bulkProcessor
    bulkProcessor = BulkProcessor.builder(
        client,
        new BulkProcessor.Listener() {

          @Override
          public void beforeBulk(long executionId, BulkRequest request) {
            log.debug("Before executing the request: {}", request);
            try {
              semaphore.acquire();
            } catch (InterruptedException e) {
              Thread.currentThread().interrupt();
            }
          }

          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
            if (!response.hasFailures()) {
              log.debug("Finished request with no failures.");
              currentBulkRequest = new BulkRequest();
            } else {
              log.error("Failure:" + response.buildFailureMessage());
              for (BulkItemResponse bulkItemResponse : response.getItems()) {
                if (bulkItemResponse.isFailed()) {
                  Throwable cause = bulkItemResponse.getFailure().getCause();
                  Throwable rootCause = ExceptionsHelper.unwrapCause(cause);
                  if (NON_RETRIABLE_EXCEPTION_CLASS.isAssignableFrom(rootCause.getClass())) {
                    canRetry = false;
                    break;
                  }
                }
              }
            }
            semaphore.release();
          }

          @Override
          public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
            log.error("Failure:", failure);
            if (NON_RETRIABLE_EXCEPTION_CLASS.isAssignableFrom(failure.getClass())) {
              canRetry = false;
            }
            semaphore.release();
          }
        })
        .setBulkActions(-1)
        .setBulkSize(new ByteSizeValue(-1))
        .setFlushInterval(null)
        .setConcurrentRequests(CONCURRENT_REQUESTS)
        .setBackoffPolicy(BackoffPolicy.noBackoff())
        .build();
    // Start the BulkProcessor
    bulkProcessor = new BulkProcessor(client, maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs, createDefaultListener());
    bulkProcessor.start();

    //Create mapping cache
    mappings = new HashSet<>();
@@ -211,7 +140,11 @@ public class ElasticsearchWriter {
    private Map<String, TopicConfig> topicConfigs = new HashMap<>();
    private long flushTimeoutMs;
    private long maxBufferedRecords;
    private long batchSize;
    private int maxInFlightRequests;
    private int batchSize;
    private long lingerMs;
    private int maxRetry;
    private long retryBackoffMs;
    private SinkTaskContext context;
    private Converter converter = ElasticsearchSinkTask.getConverter();
    private boolean mock;
@@ -277,7 +210,7 @@ public class ElasticsearchWriter {

    /**
     * Set the max number of records to buffer for each writer.
     * @param maxBufferedRecords The max number of buffered record.s
     * @param maxBufferedRecords The max number of buffered records.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setMaxBufferedRecords(long maxBufferedRecords) {
@@ -286,16 +219,56 @@ public class ElasticsearchWriter {
    }

    /**
     * Set the number of requests to process as a batch when writing
     * Set the max number of inflight requests.
     * @param maxInFlightRequests The max allowed number of inflight requests.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setMaxInFlightRequests(int maxInFlightRequests) {
      this.maxInFlightRequests = maxInFlightRequests;
      return this;
    }

    /**
     * Set the number of requests to process as a batch when writing.
     * to Elasticsearch.
     * @param batchSize the size of each batch.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setBatchSize(long batchSize) {
    public Builder setBatchSize(int batchSize) {
      this.batchSize = batchSize;
      return this;
    }

    /**
     * Set the linger time.
     * @param lingerMs The linger time to use in milliseconds.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setLingerMs(long lingerMs) {
      this.lingerMs = lingerMs;
      return this;
    }

    /**
     * Set the max retry for a batch
     * @param maxRetry The number of max retry.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setMaxRetry(int maxRetry) {
      this.maxRetry = maxRetry;
      return this;
    }

    /**
     * Set the retry backoff.
     * @param retryBackoffMs The retry backoff in milliseconds.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setRetryBackoffMs(long retryBackoffMs) {
      this.retryBackoffMs = retryBackoffMs;
      return this;
    }

    /**
     * Set the SinkTaskContext
     * @param context The SinkTaskContext.
@@ -332,72 +305,42 @@ public class ElasticsearchWriter {
     */
    public ElasticsearchWriter build() {
      return new ElasticsearchWriter(
          client, type, ignoreKey, ignoreSchema, topicConfigs, flushTimeoutMs, maxBufferedRecords, batchSize, context, converter, mock);
          client, type, ignoreKey, ignoreSchema, topicConfigs, flushTimeoutMs, maxBufferedRecords, maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs, context, converter, mock);
    }
  }

  public void write(Collection<SinkRecord> records) {

    for (SinkRecord record: records) {
      buffer.add(record);
    }

    if (buffer.size() > maxBufferedRecords) {
      for (TopicPartition tp: assignment) {
        context.pause(tp);
      }
    if (bulkProcessor.getException() != null) {
      throw new ConnectException("BulkProcessor fails with non reriable exception.", bulkProcessor.getException());
    }
    bulkProcessor.exceedMaxBufferedRecords(maxBufferedRecords, records.size());

    if (!currentBulkRequest.requests().isEmpty()) {
      log.debug("We need to retry {}", currentBulkRequest);
      return;
    }

    Iterator<SinkRecord> iter = buffer.iterator();
    int size = 0;
    while (iter.hasNext() && size < batchSize) {
      size++;
      SinkRecord record = iter.next();
      iter.remove();
      IndexRequest request = DataConverter.convertRecord(record, type, client, converter, ignoreKey, ignoreSchema, topicConfigs, mappings);
      currentBulkRequest.add(request);
    for (SinkRecord record: records) {
      ESRequest request = DataConverter.convertRecord(record, type, client, converter, ignoreKey, ignoreSchema, topicConfigs, mappings);
      bulkProcessor.add(request);
    }
  }

  // TODO: fix the logic here. Currently we did not properly handle the case that the data is in the
  // buffer but not yet flushed.
  public void flush() {
    for (ActionRequest request: currentBulkRequest.requests()) {
      bulkProcessor.add(request);
    }
    bulkProcessor.flush();
    try {
      if (semaphore.tryAcquire(CONCURRENT_REQUESTS, flushTimeoutMs, TimeUnit.MILLISECONDS)) {
        log.info("Bulk request finished.");
        if (buffer.size() < maxBufferedRecords) {
          for (TopicPartition tp: assignment) {
            context.resume(tp);
          }
        }
        semaphore.release();
        if (!canRetry) {
          throw new ConnectException("Cannot continue execution.");
        }
      } else {
        // TODO: we want to cancel the current bulk request before submitting the next one
        // Currently, the bulkProcessor thread may be blocked when retry
        log.error("Not able to finish flushing before timeout:" + flushTimeoutMs);
      if (!bulkProcessor.flush(flushTimeoutMs)) {
        throw new ConnectException("Cannot finish flush messages within " + flushTimeoutMs);
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    } catch (Throwable t) {
      throw new ConnectException("Flush failed with non retriable exception.", t);
    }
  }

  public void close() {
    bulkProcessor.stop();
    try {
      bulkProcessor.awaitClose(flushTimeoutMs, TimeUnit.SECONDS);
      bulkProcessor.awaitStop(flushTimeoutMs);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    } catch (Throwable t) {
      throw new ConnectException("Close failed with non retriable exception", t);
    }
  }

@@ -435,8 +378,22 @@ public class ElasticsearchWriter {
    }
  }

  // public for testing
  public BulkRequest getCurrentBulkRequest() {
    return currentBulkRequest;
  private Listener createDefaultListener() {
    return new Listener() {
      @Override
      public void beforeBulk(long executionId, RecordBatch batch) {

      }

      @Override
      public void afterBulk(long executionId, RecordBatch batch, ESResponse response) {

      }

      @Override
      public void afterBulk(long executionId, RecordBatch batch, Throwable failure) {

      }
    };
  }
}
Loading