Loading src/main/java/io/confluent/connect/elasticsearch/bulk/BulkProcessor.java +2 −2 Original line number Diff line number Diff line Loading @@ -120,7 +120,7 @@ public class BulkProcessor<R, B> { log.debug("Starting farmer task"); try { while (!stopRequested) { tick(); submitBatchWhenReady(); } } catch (InterruptedException e) { throw new ConnectException(e); Loading @@ -131,7 +131,7 @@ public class BulkProcessor<R, B> { } // Visible for testing synchronized Future<BulkResponse> tick() throws InterruptedException { synchronized Future<BulkResponse> submitBatchWhenReady() throws InterruptedException { for (long waitStartTimeMs = time.milliseconds(), elapsedMs = 0; !stopRequested && !canSubmit(elapsedMs); elapsedMs = time.milliseconds() - waitStartTimeMs) { Loading src/test/java/io/confluent/connect/elasticsearch/bulk/BulkProcessorTest.java +5 −5 Original line number Diff line number Diff line Loading @@ -117,9 +117,9 @@ public class BulkProcessorTest { client.expect(Arrays.asList(1, 2, 3, 4, 5), BulkResponse.success()); client.expect(Arrays.asList(6, 7, 8, 9, 10), BulkResponse.success()); client.expect(Arrays.asList(11, 12), BulkResponse.success()); // batch not full, but upon linger timeout assertTrue(bulkProcessor.tick().get().succeeded); assertTrue(bulkProcessor.tick().get().succeeded); assertTrue(bulkProcessor.tick().get().succeeded); assertTrue(bulkProcessor.submitBatchWhenReady().get().succeeded); assertTrue(bulkProcessor.submitBatchWhenReady().get().succeeded); assertTrue(bulkProcessor.submitBatchWhenReady().get().succeeded); assertTrue(client.expectationsMet()); } Loading Loading @@ -189,7 +189,7 @@ public class BulkProcessorTest { bulkProcessor.add(42); bulkProcessor.add(43); assertTrue(bulkProcessor.tick().get().succeeded); assertTrue(bulkProcessor.submitBatchWhenReady().get().succeeded); } @Test Loading Loading @@ -221,7 +221,7 @@ public class BulkProcessorTest { bulkProcessor.add(43); try { bulkProcessor.tick().get(); bulkProcessor.submitBatchWhenReady().get(); fail(); } catch (ExecutionException e) { assertTrue(e.getCause().getMessage().contains(errorInfo)); Loading Loading
src/main/java/io/confluent/connect/elasticsearch/bulk/BulkProcessor.java +2 −2 Original line number Diff line number Diff line Loading @@ -120,7 +120,7 @@ public class BulkProcessor<R, B> { log.debug("Starting farmer task"); try { while (!stopRequested) { tick(); submitBatchWhenReady(); } } catch (InterruptedException e) { throw new ConnectException(e); Loading @@ -131,7 +131,7 @@ public class BulkProcessor<R, B> { } // Visible for testing synchronized Future<BulkResponse> tick() throws InterruptedException { synchronized Future<BulkResponse> submitBatchWhenReady() throws InterruptedException { for (long waitStartTimeMs = time.milliseconds(), elapsedMs = 0; !stopRequested && !canSubmit(elapsedMs); elapsedMs = time.milliseconds() - waitStartTimeMs) { Loading
src/test/java/io/confluent/connect/elasticsearch/bulk/BulkProcessorTest.java +5 −5 Original line number Diff line number Diff line Loading @@ -117,9 +117,9 @@ public class BulkProcessorTest { client.expect(Arrays.asList(1, 2, 3, 4, 5), BulkResponse.success()); client.expect(Arrays.asList(6, 7, 8, 9, 10), BulkResponse.success()); client.expect(Arrays.asList(11, 12), BulkResponse.success()); // batch not full, but upon linger timeout assertTrue(bulkProcessor.tick().get().succeeded); assertTrue(bulkProcessor.tick().get().succeeded); assertTrue(bulkProcessor.tick().get().succeeded); assertTrue(bulkProcessor.submitBatchWhenReady().get().succeeded); assertTrue(bulkProcessor.submitBatchWhenReady().get().succeeded); assertTrue(bulkProcessor.submitBatchWhenReady().get().succeeded); assertTrue(client.expectationsMet()); } Loading Loading @@ -189,7 +189,7 @@ public class BulkProcessorTest { bulkProcessor.add(42); bulkProcessor.add(43); assertTrue(bulkProcessor.tick().get().succeeded); assertTrue(bulkProcessor.submitBatchWhenReady().get().succeeded); } @Test Loading Loading @@ -221,7 +221,7 @@ public class BulkProcessorTest { bulkProcessor.add(43); try { bulkProcessor.tick().get(); bulkProcessor.submitBatchWhenReady().get(); fail(); } catch (ExecutionException e) { assertTrue(e.getCause().getMessage().contains(errorInfo)); Loading