Loading src/main/java/io/confluent/connect/elasticsearch/bulk/BulkProcessor.java +21 −56 Original line number Diff line number Diff line Loading @@ -42,6 +42,8 @@ public class BulkProcessor<R, B> { private static final Logger log = LoggerFactory.getLogger(BulkProcessor.class); private static final AtomicLong BATCH_ID_GEN = new AtomicLong(); private final Time time; private final BulkClient<R, B> bulkClient; private final int maxBufferedRecords; Loading @@ -53,11 +55,6 @@ public class BulkProcessor<R, B> { private final Thread farmer; private final ExecutorService executor; // thread-safe stats private final AtomicLong createdBatches = new AtomicLong(); private final AtomicLong successfulRecords = new AtomicLong(); private final AtomicLong successfulBatches = new AtomicLong(); // thread-safe state, can be mutated safely without synchronization, // but may be part of synchronized(this) wait() conditions so need to notifyAll() on changes private volatile boolean stopRequested = false; Loading Loading @@ -105,7 +102,7 @@ public class BulkProcessor<R, B> { return new ThreadFactory() { @Override public Thread newThread(Runnable r) { final Thread t = new Thread(r, "BulkProcessor-t" + threadCounter.getAndIncrement()); final Thread t = new Thread(r, String.format("BulkProcessor@%d-%d", System.identityHashCode(this), threadCounter.getAndIncrement())); t.setDaemon(true); t.setUncaughtExceptionHandler(uncaughtExceptionHandler); return t; Loading Loading @@ -257,9 +254,12 @@ public class BulkProcessor<R, B> { /** * Add a record, may block upto {@code timeoutMs} if at capacity with respect to {@code maxBufferedRecords}. * * If any task has failed prior to or while blocked in the add, {@link ConnectException} will be thrown with that error. * If any task has failed prior to or while blocked in the add, or if the timeout expires while blocked, {@link ConnectException} will be thrown. */ public synchronized void add(R record, long timeoutMs) { throwIfTerminal(); if (bufferedRecords() >= maxBufferedRecords) { final long addStartTimeMs = time.milliseconds(); for (long elapsedMs = time.milliseconds() - addStartTimeMs; !isTerminal() && elapsedMs < timeoutMs && bufferedRecords() >= maxBufferedRecords; Loading @@ -274,6 +274,8 @@ public class BulkProcessor<R, B> { if (bufferedRecords() >= maxBufferedRecords) { throw new ConnectException("Add timeout expired before buffer availability"); } } unsentRecords.addLast(record); notifyAll(); } Loading Loading @@ -309,7 +311,7 @@ public class BulkProcessor<R, B> { private final class BulkTask implements Callable<BulkResponse> { final long batchId = createdBatches.incrementAndGet(); final long batchId = BATCH_ID_GEN.incrementAndGet(); final List<R> batch; Loading Loading @@ -364,8 +366,6 @@ public class BulkProcessor<R, B> { } private synchronized void onBatchCompletion(int batchSize) { successfulBatches.incrementAndGet(); successfulRecords.addAndGet(batchSize); inFlightRecords -= batchSize; assert inFlightRecords >= 0; notifyAll(); Loading @@ -376,20 +376,6 @@ public class BulkProcessor<R, B> { stop(); } /** * @return count of currently buffered records */ public synchronized int unsentRecords() { return unsentRecords.size(); } /** * @return count of records currently in flight */ public synchronized int inFlightRecords() { return inFlightRecords; } /** * @return sum of unsent and in-flight record counts */ Loading @@ -397,27 +383,6 @@ public class BulkProcessor<R, B> { return unsentRecords.size() + inFlightRecords; } /** * @return count of batches that have been created */ public long createdBatches() { return createdBatches.get(); } /** * @return count of batches successfully executed */ public long successfulBatches() { return successfulBatches.get(); } /** * @return count of records successfully sent */ public long successfulRecords() { return successfulRecords.get(); } private static ConnectException toConnectException(Throwable t) { if (t instanceof ConnectException) { return (ConnectException) t; Loading Loading
src/main/java/io/confluent/connect/elasticsearch/bulk/BulkProcessor.java +21 −56 Original line number Diff line number Diff line Loading @@ -42,6 +42,8 @@ public class BulkProcessor<R, B> { private static final Logger log = LoggerFactory.getLogger(BulkProcessor.class); private static final AtomicLong BATCH_ID_GEN = new AtomicLong(); private final Time time; private final BulkClient<R, B> bulkClient; private final int maxBufferedRecords; Loading @@ -53,11 +55,6 @@ public class BulkProcessor<R, B> { private final Thread farmer; private final ExecutorService executor; // thread-safe stats private final AtomicLong createdBatches = new AtomicLong(); private final AtomicLong successfulRecords = new AtomicLong(); private final AtomicLong successfulBatches = new AtomicLong(); // thread-safe state, can be mutated safely without synchronization, // but may be part of synchronized(this) wait() conditions so need to notifyAll() on changes private volatile boolean stopRequested = false; Loading Loading @@ -105,7 +102,7 @@ public class BulkProcessor<R, B> { return new ThreadFactory() { @Override public Thread newThread(Runnable r) { final Thread t = new Thread(r, "BulkProcessor-t" + threadCounter.getAndIncrement()); final Thread t = new Thread(r, String.format("BulkProcessor@%d-%d", System.identityHashCode(this), threadCounter.getAndIncrement())); t.setDaemon(true); t.setUncaughtExceptionHandler(uncaughtExceptionHandler); return t; Loading Loading @@ -257,9 +254,12 @@ public class BulkProcessor<R, B> { /** * Add a record, may block upto {@code timeoutMs} if at capacity with respect to {@code maxBufferedRecords}. * * If any task has failed prior to or while blocked in the add, {@link ConnectException} will be thrown with that error. * If any task has failed prior to or while blocked in the add, or if the timeout expires while blocked, {@link ConnectException} will be thrown. */ public synchronized void add(R record, long timeoutMs) { throwIfTerminal(); if (bufferedRecords() >= maxBufferedRecords) { final long addStartTimeMs = time.milliseconds(); for (long elapsedMs = time.milliseconds() - addStartTimeMs; !isTerminal() && elapsedMs < timeoutMs && bufferedRecords() >= maxBufferedRecords; Loading @@ -274,6 +274,8 @@ public class BulkProcessor<R, B> { if (bufferedRecords() >= maxBufferedRecords) { throw new ConnectException("Add timeout expired before buffer availability"); } } unsentRecords.addLast(record); notifyAll(); } Loading Loading @@ -309,7 +311,7 @@ public class BulkProcessor<R, B> { private final class BulkTask implements Callable<BulkResponse> { final long batchId = createdBatches.incrementAndGet(); final long batchId = BATCH_ID_GEN.incrementAndGet(); final List<R> batch; Loading Loading @@ -364,8 +366,6 @@ public class BulkProcessor<R, B> { } private synchronized void onBatchCompletion(int batchSize) { successfulBatches.incrementAndGet(); successfulRecords.addAndGet(batchSize); inFlightRecords -= batchSize; assert inFlightRecords >= 0; notifyAll(); Loading @@ -376,20 +376,6 @@ public class BulkProcessor<R, B> { stop(); } /** * @return count of currently buffered records */ public synchronized int unsentRecords() { return unsentRecords.size(); } /** * @return count of records currently in flight */ public synchronized int inFlightRecords() { return inFlightRecords; } /** * @return sum of unsent and in-flight record counts */ Loading @@ -397,27 +383,6 @@ public class BulkProcessor<R, B> { return unsentRecords.size() + inFlightRecords; } /** * @return count of batches that have been created */ public long createdBatches() { return createdBatches.get(); } /** * @return count of batches successfully executed */ public long successfulBatches() { return successfulBatches.get(); } /** * @return count of records successfully sent */ public long successfulRecords() { return successfulRecords.get(); } private static ConnectException toConnectException(Throwable t) { if (t instanceof ConnectException) { return (ConnectException) t; Loading