Commit 66803ce6 authored by Shikhar Bhushan's avatar Shikhar Bhushan
Browse files

CC-301: Lower default values of `max.buffered.records` and `batch.size`

Also some slight cleanup around throwing `RetriableException` when `max.buffered.records` is hit
parent f2b42848
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -67,12 +67,12 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
  private static final String MAX_BUFFERED_RECORDS_DOC =
      "Approximately the max number of records each task will buffer. This config controls the memory usage for each task. When the number of "
      + "buffered records is larger than this value, the partitions assigned to this task will be paused.";
  private static final long MAX_BUFFERED_RECORDS_DEFAULT = 100000;
  private static final long MAX_BUFFERED_RECORDS_DEFAULT = 20000;
  private static final String MAX_BUFFERED_RECORDS_DISPLAY = "Max Number of Records to Buffer";

  public static final String BATCH_SIZE_CONFIG = "batch.size";
  private static final String BATCH_SIZE_DOC = "The number of requests to process as a batch when writing to Elasticsearch.";
  private static final int BATCH_SIZE_DEFAULT = 10000;
  private static final int BATCH_SIZE_DEFAULT = 2000;
  private static final String BATCH_SIZE_DISPLAY = "Batch Size";

  public static final String LINGER_MS_CONFIG = "linger.ms";
+5 −3
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@ package io.confluent.connect.elasticsearch;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.kafka.connect.storage.Converter;
@@ -298,10 +299,11 @@ public class ElasticsearchWriter {

  public void write(Collection<SinkRecord> records) {
    if (bulkProcessor.getException() != null) {
      throw new ConnectException("BulkProcessor fails with non reriable exception.", bulkProcessor.getException());
      throw new ConnectException("BulkProcessor failed with non-retriable exception", bulkProcessor.getException());
    }
    if (bulkProcessor.getTotalBufferedRecords() + records.size() > maxBufferedRecords) {
      throw new RetriableException("Exceeded max number of buffered records: " + maxBufferedRecords);
    }
    bulkProcessor.exceedMaxBufferedRecords(maxBufferedRecords, records.size());

    for (SinkRecord record: records) {
      ESRequest request = DataConverter.convertRecord(record, type, client, converter, ignoreKey, ignoreSchema, topicConfigs, mappings);
      bulkProcessor.add(request);
+3 −7
Original line number Diff line number Diff line
@@ -15,7 +15,6 @@
 **/
package io.confluent.connect.elasticsearch.internals;

import org.apache.kafka.connect.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -270,17 +269,14 @@ public class BulkProcessor implements Runnable {
    }
  }

  public void exceedMaxBufferedRecords(long maxBufferedRecords, int batchSize) {
    synchronized (requests) {
  public int getTotalBufferedRecords() {
    int total = 0;
    synchronized (requests) {
      for (RecordBatch batch: requests) {
        total += batch.size();
      }
      total += batchSize;
      if (total > maxBufferedRecords) {
        throw new RetriableException("Exceed max number of buffered records");
      }
    }
    return total;
  }

  // visible for testing