Loading src/main/java/io/confluent/connect/elasticsearch/bulk/BulkProcessor.java +12 −10 Original line number Diff line number Diff line Loading @@ -138,9 +138,21 @@ public class BulkProcessor<R, B> { // when linger time has already elapsed, we still have to ensure the other submission conditions hence the wait(0) in that case wait(Math.max(0, lingerMs - elapsedMs)); } // at this point, either stopRequested or canSubmit return stopRequested ? null : submitBatch(); } private synchronized Future<BulkResponse> submitBatch() { assert !unsentRecords.isEmpty(); final int batchableSize = Math.min(batchSize, unsentRecords.size()); final List<R> batch = new ArrayList<>(batchableSize); for (int i = 0; i < batchableSize; i++) { batch.add(unsentRecords.removeFirst()); } inFlightRecords += batchableSize; return executor.submit(new BulkTask(batch)); } /** * Submission is possible when there are unsent records and: * <ul> Loading Loading @@ -195,16 +207,6 @@ public class BulkProcessor<R, B> { } } private synchronized Future<BulkResponse> submitBatch() { final int batchableSize = Math.min(batchSize, unsentRecords.size()); final List<R> batch = new ArrayList<>(batchableSize); for (int i = 0; i < batchableSize; i++) { batch.add(unsentRecords.removeFirst()); } inFlightRecords += batchableSize; return executor.submit(new BulkTask(batch)); } /** * @return whether {@link #stop()} has been requested */ Loading Loading
src/main/java/io/confluent/connect/elasticsearch/bulk/BulkProcessor.java +12 −10 Original line number Diff line number Diff line Loading @@ -138,9 +138,21 @@ public class BulkProcessor<R, B> { // when linger time has already elapsed, we still have to ensure the other submission conditions hence the wait(0) in that case wait(Math.max(0, lingerMs - elapsedMs)); } // at this point, either stopRequested or canSubmit return stopRequested ? null : submitBatch(); } private synchronized Future<BulkResponse> submitBatch() { assert !unsentRecords.isEmpty(); final int batchableSize = Math.min(batchSize, unsentRecords.size()); final List<R> batch = new ArrayList<>(batchableSize); for (int i = 0; i < batchableSize; i++) { batch.add(unsentRecords.removeFirst()); } inFlightRecords += batchableSize; return executor.submit(new BulkTask(batch)); } /** * Submission is possible when there are unsent records and: * <ul> Loading Loading @@ -195,16 +207,6 @@ public class BulkProcessor<R, B> { } } private synchronized Future<BulkResponse> submitBatch() { final int batchableSize = Math.min(batchSize, unsentRecords.size()); final List<R> batch = new ArrayList<>(batchableSize); for (int i = 0; i < batchableSize; i++) { batch.add(unsentRecords.removeFirst()); } inFlightRecords += batchableSize; return executor.submit(new BulkTask(batch)); } /** * @return whether {@link #stop()} has been requested */ Loading