Loading src/test/java/io/confluent/connect/elasticsearch/bulk/BulkProcessorTest.java +63 −4 Original line number Diff line number Diff line Loading @@ -29,6 +29,7 @@ import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class BulkProcessorTest { Loading Loading @@ -160,13 +161,71 @@ public class BulkProcessorTest { } @Test public void retriableErrors() { // TODO public void retriableErrors() throws InterruptedException, ExecutionException { final Client client = new Client(); final int maxBufferedRecords = 100; final int maxInFlightBatches = 5; final int batchSize = 2; final int lingerMs = 5; final int maxRetries = 3; final int retryBackoffMs = 1; client.expect(Arrays.asList(42, 43), BulkResponse.failure(true, "a retiable error")); client.expect(Arrays.asList(42, 43), BulkResponse.failure(true, "a retriable error again")); client.expect(Arrays.asList(42, 43), BulkResponse.success()); final BulkProcessor<Integer, ?> bulkProcessor = new BulkProcessor<>( new SystemTime(), client, maxBufferedRecords, maxInFlightBatches, batchSize, lingerMs, maxRetries, retryBackoffMs ); bulkProcessor.add(42); bulkProcessor.add(43); assertTrue(bulkProcessor.tick().get().succeeded); } @Test public void unretriableErrors() { // TODO public void unretriableErrors() throws InterruptedException { final Client client = new Client(); final int maxBufferedRecords = 100; final int maxInFlightBatches = 5; final int batchSize = 2; final int lingerMs = 5; final int maxRetries = 3; final int retryBackoffMs = 1; final String errorInfo = "an unretriable error"; client.expect(Arrays.asList(42, 43), BulkResponse.failure(false, errorInfo)); final BulkProcessor<Integer, ?> bulkProcessor = new BulkProcessor<>( new SystemTime(), client, maxBufferedRecords, maxInFlightBatches, batchSize, lingerMs, maxRetries, retryBackoffMs ); bulkProcessor.add(42); bulkProcessor.add(43); try { bulkProcessor.tick().get(); fail(); } catch (ExecutionException e) { assertTrue(e.getCause().getMessage().contains(errorInfo)); } } } Loading
src/test/java/io/confluent/connect/elasticsearch/bulk/BulkProcessorTest.java +63 −4 Original line number Diff line number Diff line Loading @@ -29,6 +29,7 @@ import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class BulkProcessorTest { Loading Loading @@ -160,13 +161,71 @@ public class BulkProcessorTest { } @Test public void retriableErrors() { // TODO public void retriableErrors() throws InterruptedException, ExecutionException { final Client client = new Client(); final int maxBufferedRecords = 100; final int maxInFlightBatches = 5; final int batchSize = 2; final int lingerMs = 5; final int maxRetries = 3; final int retryBackoffMs = 1; client.expect(Arrays.asList(42, 43), BulkResponse.failure(true, "a retiable error")); client.expect(Arrays.asList(42, 43), BulkResponse.failure(true, "a retriable error again")); client.expect(Arrays.asList(42, 43), BulkResponse.success()); final BulkProcessor<Integer, ?> bulkProcessor = new BulkProcessor<>( new SystemTime(), client, maxBufferedRecords, maxInFlightBatches, batchSize, lingerMs, maxRetries, retryBackoffMs ); bulkProcessor.add(42); bulkProcessor.add(43); assertTrue(bulkProcessor.tick().get().succeeded); } @Test public void unretriableErrors() { // TODO public void unretriableErrors() throws InterruptedException { final Client client = new Client(); final int maxBufferedRecords = 100; final int maxInFlightBatches = 5; final int batchSize = 2; final int lingerMs = 5; final int maxRetries = 3; final int retryBackoffMs = 1; final String errorInfo = "an unretriable error"; client.expect(Arrays.asList(42, 43), BulkResponse.failure(false, errorInfo)); final BulkProcessor<Integer, ?> bulkProcessor = new BulkProcessor<>( new SystemTime(), client, maxBufferedRecords, maxInFlightBatches, batchSize, lingerMs, maxRetries, retryBackoffMs ); bulkProcessor.add(42); bulkProcessor.add(43); try { bulkProcessor.tick().get(); fail(); } catch (ExecutionException e) { assertTrue(e.getCause().getMessage().contains(errorInfo)); } } }