Commit e86bbd1c authored by Shikhar Bhushan's avatar Shikhar Bhushan
Browse files

test for add() blocking when buffer full

parent d6ea6404
Loading
Loading
Loading
Loading
+0 −4
Original line number Diff line number Diff line
@@ -254,10 +254,6 @@ public class BulkProcessor<R, B> {
    throwIfStopping();
  }

  public synchronized void add(R record) {
    add(record, Long.MAX_VALUE);
  }

  /**
   * Add a record, may block upto {@code timeoutMs} if at capacity with respect to {@code maxBufferedRecords}.
   *
+58 −19
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@
package io.confluent.connect.elasticsearch.bulk;

import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.connect.errors.ConnectException;
import org.junit.Test;

import java.io.IOException;
@@ -101,18 +102,19 @@ public class BulkProcessorTest {
        retryBackoffMs
    );

    bulkProcessor.add(1);
    bulkProcessor.add(2);
    bulkProcessor.add(3);
    bulkProcessor.add(4);
    bulkProcessor.add(5);
    bulkProcessor.add(6);
    bulkProcessor.add(7);
    bulkProcessor.add(8);
    bulkProcessor.add(9);
    bulkProcessor.add(10);
    bulkProcessor.add(11);
    bulkProcessor.add(12);
    final int addTimeoutMs = 10;
    bulkProcessor.add(1, addTimeoutMs);
    bulkProcessor.add(2, addTimeoutMs);
    bulkProcessor.add(3, addTimeoutMs);
    bulkProcessor.add(4, addTimeoutMs);
    bulkProcessor.add(5, addTimeoutMs);
    bulkProcessor.add(6, addTimeoutMs);
    bulkProcessor.add(7, addTimeoutMs);
    bulkProcessor.add(8, addTimeoutMs);
    bulkProcessor.add(9, addTimeoutMs);
    bulkProcessor.add(10, addTimeoutMs);
    bulkProcessor.add(11, addTimeoutMs);
    bulkProcessor.add(12, addTimeoutMs);

    client.expect(Arrays.asList(1, 2, 3, 4, 5), BulkResponse.success());
    client.expect(Arrays.asList(6, 7, 8, 9, 10), BulkResponse.success());
@@ -148,9 +150,11 @@ public class BulkProcessorTest {
    client.expect(Arrays.asList(1, 2, 3), BulkResponse.success());

    bulkProcessor.start();
    bulkProcessor.add(1);
    bulkProcessor.add(2);
    bulkProcessor.add(3);

    final int addTimeoutMs = 10;
    bulkProcessor.add(1, addTimeoutMs);
    bulkProcessor.add(2, addTimeoutMs);
    bulkProcessor.add(3, addTimeoutMs);

    assertFalse(client.expectationsMet());

@@ -160,6 +164,39 @@ public class BulkProcessorTest {
    assertTrue(client.expectationsMet());
  }

  @Test
  public void addBlocksWhenBufferFull() {
    final Client client = new Client();

    final int maxBufferedRecords = 1;
    final int maxInFlightBatches = 1;
    final int batchSize = 1;
    final int lingerMs = 10;
    final int maxRetries = 0;
    final int retryBackoffMs = 0;

    final BulkProcessor<Integer, ?> bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
        client,
        maxBufferedRecords,
        maxInFlightBatches,
        batchSize,
        lingerMs,
        maxRetries,
        retryBackoffMs
    );

    final int addTimeoutMs = 10;
    bulkProcessor.add(42, addTimeoutMs);
    assertEquals(1, bulkProcessor.bufferedRecords());
    try {
      // BulkProcessor not started, so this add should timeout & throw
      bulkProcessor.add(43, addTimeoutMs);
      fail();
    } catch (ConnectException good) {
    }
  }

  @Test
  public void retriableErrors() throws InterruptedException, ExecutionException {
    final Client client = new Client();
@@ -186,8 +223,9 @@ public class BulkProcessorTest {
        retryBackoffMs
    );

    bulkProcessor.add(42);
    bulkProcessor.add(43);
    final int addTimeoutMs = 10;
    bulkProcessor.add(42, addTimeoutMs);
    bulkProcessor.add(43, addTimeoutMs);

    assertTrue(bulkProcessor.submitBatchWhenReady().get().succeeded);
  }
@@ -217,8 +255,9 @@ public class BulkProcessorTest {
        retryBackoffMs
    );

    bulkProcessor.add(42);
    bulkProcessor.add(43);
    final int addTimeoutMs = 10;
    bulkProcessor.add(42, addTimeoutMs);
    bulkProcessor.add(43, addTimeoutMs);

    try {
      bulkProcessor.submitBatchWhenReady().get();