Commit f62c5875 authored by Shikhar Bhushan's avatar Shikhar Bhushan
Browse files

Not useful to track failed batches as failures are fatal

parent cfa5b07a
Loading
Loading
Loading
Loading
+11 −12
Original line number Diff line number Diff line
@@ -56,9 +56,7 @@ public class BulkProcessor<R, B> {
  // thread-safe stats
  private final AtomicLong createdBatches = new AtomicLong();
  private final AtomicLong successfulRecords = new AtomicLong();
  private final AtomicLong failedRecords = new AtomicLong();
  private final AtomicLong successfulBatches = new AtomicLong();
  private final AtomicLong failedBatches = new AtomicLong();

  // thread-safe state, can be mutated safely without synchronization,
  // but may be part of synchronized(this) wait() conditions so need to notifyAll() on changes
@@ -327,7 +325,7 @@ public class BulkProcessor<R, B> {
      try {
        rsp = execute();
      } catch (Exception e) {
        onBatchFailure(batch.size(), e);
        failAndStop(e);
        throw e;
      }
      log.debug("Successfully executed batch {} of {} records", batchId, batch.size());
@@ -375,12 +373,6 @@ public class BulkProcessor<R, B> {
    notifyAll();
  }

  private void onBatchFailure(int batchSize, Exception e) {
    failedBatches.incrementAndGet();
    failedRecords.addAndGet(batchSize);
    failAndStop(e);
  }

  private void failAndStop(Throwable t) {
    error.compareAndSet(null, toConnectException(t));
    stop();
@@ -407,6 +399,13 @@ public class BulkProcessor<R, B> {
    return unsentRecords.size() + inFlightRecords;
  }

  /**
   * @return count of batches that have been created
   */
  public long createdBatches() {
    return createdBatches.get();
  }

  /**
   * @return count of batches successfully executed
   */
@@ -415,10 +414,10 @@ public class BulkProcessor<R, B> {
  }

  /**
   * @return count of batches that failed
   * @return count of records successfully sent
   */
  public long failedBatches() {
    return failedBatches.get();
  public long successfulRecords() {
    return successfulRecords.get();
  }

  private static ConnectException toConnectException(Throwable t) {