Loading src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java +2 −2 Original line number Diff line number Diff line Loading @@ -67,12 +67,12 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig { private static final String MAX_BUFFERED_RECORDS_DOC = "Approximately the max number of records each task will buffer. This config controls the memory usage for each task. When the number of " + "buffered records is larger than this value, the partitions assigned to this task will be paused."; private static final long MAX_BUFFERED_RECORDS_DEFAULT = 100000; private static final long MAX_BUFFERED_RECORDS_DEFAULT = 20000; private static final String MAX_BUFFERED_RECORDS_DISPLAY = "Max Number of Records to Buffer"; public static final String BATCH_SIZE_CONFIG = "batch.size"; private static final String BATCH_SIZE_DOC = "The number of requests to process as a batch when writing to Elasticsearch."; private static final int BATCH_SIZE_DEFAULT = 10000; private static final int BATCH_SIZE_DEFAULT = 2000; private static final String BATCH_SIZE_DISPLAY = "Batch Size"; public static final String LINGER_MS_CONFIG = "linger.ms"; Loading src/main/java/io/confluent/connect/elasticsearch/ElasticsearchWriter.java +5 −3 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ package io.confluent.connect.elasticsearch; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTaskContext; import org.apache.kafka.connect.storage.Converter; Loading Loading @@ -298,10 +299,11 @@ public class ElasticsearchWriter { public void write(Collection<SinkRecord> records) { if (bulkProcessor.getException() != null) { throw new ConnectException("BulkProcessor fails with non reriable exception.", bulkProcessor.getException()); throw new ConnectException("BulkProcessor failed with non-retriable exception", bulkProcessor.getException()); } if (bulkProcessor.getTotalBufferedRecords() + records.size() > maxBufferedRecords) { throw new RetriableException("Exceeded max number of buffered records: " + maxBufferedRecords); } bulkProcessor.exceedMaxBufferedRecords(maxBufferedRecords, records.size()); for (SinkRecord record: records) { ESRequest request = DataConverter.convertRecord(record, type, client, converter, ignoreKey, ignoreSchema, topicConfigs, mappings); bulkProcessor.add(request); Loading src/main/java/io/confluent/connect/elasticsearch/internals/BulkProcessor.java +3 −7 Original line number Diff line number Diff line Loading @@ -15,7 +15,6 @@ **/ package io.confluent.connect.elasticsearch.internals; import org.apache.kafka.connect.errors.RetriableException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; Loading Loading @@ -270,17 +269,14 @@ public class BulkProcessor implements Runnable { } } public void exceedMaxBufferedRecords(long maxBufferedRecords, int batchSize) { synchronized (requests) { public int getTotalBufferedRecords() { int total = 0; synchronized (requests) { for (RecordBatch batch: requests) { total += batch.size(); } total += batchSize; if (total > maxBufferedRecords) { throw new RetriableException("Exceed max number of buffered records"); } } return total; } // visible for testing Loading Loading
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java +2 −2 Original line number Diff line number Diff line Loading @@ -67,12 +67,12 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig { private static final String MAX_BUFFERED_RECORDS_DOC = "Approximately the max number of records each task will buffer. This config controls the memory usage for each task. When the number of " + "buffered records is larger than this value, the partitions assigned to this task will be paused."; private static final long MAX_BUFFERED_RECORDS_DEFAULT = 100000; private static final long MAX_BUFFERED_RECORDS_DEFAULT = 20000; private static final String MAX_BUFFERED_RECORDS_DISPLAY = "Max Number of Records to Buffer"; public static final String BATCH_SIZE_CONFIG = "batch.size"; private static final String BATCH_SIZE_DOC = "The number of requests to process as a batch when writing to Elasticsearch."; private static final int BATCH_SIZE_DEFAULT = 10000; private static final int BATCH_SIZE_DEFAULT = 2000; private static final String BATCH_SIZE_DISPLAY = "Batch Size"; public static final String LINGER_MS_CONFIG = "linger.ms"; Loading
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchWriter.java +5 −3 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ package io.confluent.connect.elasticsearch; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTaskContext; import org.apache.kafka.connect.storage.Converter; Loading Loading @@ -298,10 +299,11 @@ public class ElasticsearchWriter { public void write(Collection<SinkRecord> records) { if (bulkProcessor.getException() != null) { throw new ConnectException("BulkProcessor fails with non reriable exception.", bulkProcessor.getException()); throw new ConnectException("BulkProcessor failed with non-retriable exception", bulkProcessor.getException()); } if (bulkProcessor.getTotalBufferedRecords() + records.size() > maxBufferedRecords) { throw new RetriableException("Exceeded max number of buffered records: " + maxBufferedRecords); } bulkProcessor.exceedMaxBufferedRecords(maxBufferedRecords, records.size()); for (SinkRecord record: records) { ESRequest request = DataConverter.convertRecord(record, type, client, converter, ignoreKey, ignoreSchema, topicConfigs, mappings); bulkProcessor.add(request); Loading
src/main/java/io/confluent/connect/elasticsearch/internals/BulkProcessor.java +3 −7 Original line number Diff line number Diff line Loading @@ -15,7 +15,6 @@ **/ package io.confluent.connect.elasticsearch.internals; import org.apache.kafka.connect.errors.RetriableException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; Loading Loading @@ -270,17 +269,14 @@ public class BulkProcessor implements Runnable { } } public void exceedMaxBufferedRecords(long maxBufferedRecords, int batchSize) { synchronized (requests) { public int getTotalBufferedRecords() { int total = 0; synchronized (requests) { for (RecordBatch batch: requests) { total += batch.size(); } total += batchSize; if (total > maxBufferedRecords) { throw new RetriableException("Exceed max number of buffered records"); } } return total; } // visible for testing Loading