Loading docs/elasticsearch_connector.rst +15 −14 Original line number Diff line number Diff line Loading @@ -250,7 +250,8 @@ uses an exponential backoff technique to give the Elasticsearch service time to adds randomness, called jitter, to the calculated backoff times to prevent a thundering herd, where large numbers of requests from many tasks are submitted concurrently and overwhelm the service. Randomness spreads out the retries from many tasks and should reduce the overall time required to complete all outstanding requests compared to simple exponential backoff. compared to simple exponential backoff. The goal is to spread out the requests to Elasticsearch as much as possible. The number of retries is dictated by the ``max.retries`` connector configuration property, which defaults to 5 attempts. The backoff time, which is the amount of time to wait before retrying, is a function of the Loading @@ -264,11 +265,11 @@ before submitting each of the 5 retry attempts: ===== ===================== ===================== ============================================== Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec) ===== ===================== ===================== ============================================== 1 0.5 0.5 0.5 2 0.5 1.0 1.5 3 0.5 2.0 3.5 4 0.5 4.0 7.5 5 0.5 8.0 15.5 1 0.0 0.5 0.5 2 0.0 1.0 1.5 3 0.0 2.0 3.5 4 0.0 4.0 7.5 5 0.0 8.0 15.5 ===== ===================== ===================== ============================================== Note how the maximum wait time is simply the normal exponential backoff, calculated as ``${retry.backoff.ms} * 2 ^ (retry-1)``. Loading @@ -280,14 +281,14 @@ Increasing the maximum number of retries adds more backoff: ===== ===================== ===================== ============================================== Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec) ===== ===================== ===================== ============================================== 6 0.5 16.0 31.5 7 0.5 32.0 63.5 8 0.5 64.0 127.5 9 0.5 128.0 256.5 10 0.5 256.0 511.5 11 0.5 512.0 1023.5 12 0.5 1024.0 2047.5 13 0.5 2048.0 4095.5 6 0.0 16.0 31.5 7 0.0 32.0 63.5 8 0.0 64.0 127.5 9 0.0 128.0 256.5 10 0.0 256.0 511.5 11 0.0 512.0 1023.5 12 0.0 1024.0 2047.5 13 0.0 2048.0 4095.5 ===== ===================== ===================== ============================================== By increasing ``max.retries`` to 10, the connector may take up to 511.5 seconds, or a little over 8.5 minutes, Loading src/main/java/io/confluent/connect/elasticsearch/RetryUtil.java +4 −5 Original line number Diff line number Diff line Loading @@ -36,8 +36,7 @@ public class RetryUtil { /** * Compute the time to sleep using exponential backoff with jitter. This method computes the normal exponential backoff * as {@code initialRetryBackoffMs << retryAttempt}, and then chooses a random value between {@code initialRetryBackoffMs} * and that value. * as {@code initialRetryBackoffMs << retryAttempt}, and then chooses a random value between 0 and that value. * * @param retryAttempts the number of previous retry attempts; must be non-negative * @param initialRetryBackoffMs the initial time to wait before retrying; assumed to be 0 if value is negative Loading @@ -45,14 +44,14 @@ public class RetryUtil { */ public static long computeRandomRetryWaitTimeInMillis(int retryAttempts, long initialRetryBackoffMs) { if (initialRetryBackoffMs < 0) return 0; if (retryAttempts <= 0) return initialRetryBackoffMs; if (retryAttempts < 0) return initialRetryBackoffMs; long maxRetryTime = computeRetryWaitTimeInMillis(retryAttempts, initialRetryBackoffMs); return ThreadLocalRandom.current().nextLong(initialRetryBackoffMs, maxRetryTime); return ThreadLocalRandom.current().nextLong(0, maxRetryTime); } /** * Compute the time to sleep using exponential backoff. This method computes the normal exponential backoff * as {@code initialRetryBackoffMs << retryAttempt}. bounded to always be less than {@link #MAX_RETRY_TIME_MS}. * as {@code initialRetryBackoffMs << retryAttempt}, bounded to always be less than {@link #MAX_RETRY_TIME_MS}. * * @param retryAttempts the number of previous retry attempts; must be non-negative * @param initialRetryBackoffMs the initial time to wait before retrying; assumed to be 0 if value is negative Loading src/test/java/io/confluent/connect/elasticsearch/RetryUtilTest.java +11 −7 Original line number Diff line number Diff line Loading @@ -46,13 +46,17 @@ public class RetryUtilTest { assertEquals(3200L, RetryUtil.computeRetryWaitTimeInMillis(5, 100L)); } protected void assertComputeRetryInRange(int retryAttempts, long initialRetryBackoffMs) { protected void assertComputeRetryInRange(int retryAttempts, long retryBackoffMs) { for (int i = 0; i != 20; ++i) { for (int retries = 0; retries <= retryAttempts; ++retries) { long result = RetryUtil.computeRetryWaitTimeInMillis(retries, initialRetryBackoffMs); if (initialRetryBackoffMs < 0) { long maxResult = RetryUtil.computeRetryWaitTimeInMillis(retries, retryBackoffMs); long result = RetryUtil.computeRandomRetryWaitTimeInMillis(retries, retryBackoffMs); if (retryBackoffMs < 0) { assertEquals(0, result); } else { assertTrue(result >= initialRetryBackoffMs); assertTrue(result >= 0L); assertTrue(result <= maxResult); } } } } Loading Loading
docs/elasticsearch_connector.rst +15 −14 Original line number Diff line number Diff line Loading @@ -250,7 +250,8 @@ uses an exponential backoff technique to give the Elasticsearch service time to adds randomness, called jitter, to the calculated backoff times to prevent a thundering herd, where large numbers of requests from many tasks are submitted concurrently and overwhelm the service. Randomness spreads out the retries from many tasks and should reduce the overall time required to complete all outstanding requests compared to simple exponential backoff. compared to simple exponential backoff. The goal is to spread out the requests to Elasticsearch as much as possible. The number of retries is dictated by the ``max.retries`` connector configuration property, which defaults to 5 attempts. The backoff time, which is the amount of time to wait before retrying, is a function of the Loading @@ -264,11 +265,11 @@ before submitting each of the 5 retry attempts: ===== ===================== ===================== ============================================== Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec) ===== ===================== ===================== ============================================== 1 0.5 0.5 0.5 2 0.5 1.0 1.5 3 0.5 2.0 3.5 4 0.5 4.0 7.5 5 0.5 8.0 15.5 1 0.0 0.5 0.5 2 0.0 1.0 1.5 3 0.0 2.0 3.5 4 0.0 4.0 7.5 5 0.0 8.0 15.5 ===== ===================== ===================== ============================================== Note how the maximum wait time is simply the normal exponential backoff, calculated as ``${retry.backoff.ms} * 2 ^ (retry-1)``. Loading @@ -280,14 +281,14 @@ Increasing the maximum number of retries adds more backoff: ===== ===================== ===================== ============================================== Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec) ===== ===================== ===================== ============================================== 6 0.5 16.0 31.5 7 0.5 32.0 63.5 8 0.5 64.0 127.5 9 0.5 128.0 256.5 10 0.5 256.0 511.5 11 0.5 512.0 1023.5 12 0.5 1024.0 2047.5 13 0.5 2048.0 4095.5 6 0.0 16.0 31.5 7 0.0 32.0 63.5 8 0.0 64.0 127.5 9 0.0 128.0 256.5 10 0.0 256.0 511.5 11 0.0 512.0 1023.5 12 0.0 1024.0 2047.5 13 0.0 2048.0 4095.5 ===== ===================== ===================== ============================================== By increasing ``max.retries`` to 10, the connector may take up to 511.5 seconds, or a little over 8.5 minutes, Loading
src/main/java/io/confluent/connect/elasticsearch/RetryUtil.java +4 −5 Original line number Diff line number Diff line Loading @@ -36,8 +36,7 @@ public class RetryUtil { /** * Compute the time to sleep using exponential backoff with jitter. This method computes the normal exponential backoff * as {@code initialRetryBackoffMs << retryAttempt}, and then chooses a random value between {@code initialRetryBackoffMs} * and that value. * as {@code initialRetryBackoffMs << retryAttempt}, and then chooses a random value between 0 and that value. * * @param retryAttempts the number of previous retry attempts; must be non-negative * @param initialRetryBackoffMs the initial time to wait before retrying; assumed to be 0 if value is negative Loading @@ -45,14 +44,14 @@ public class RetryUtil { */ public static long computeRandomRetryWaitTimeInMillis(int retryAttempts, long initialRetryBackoffMs) { if (initialRetryBackoffMs < 0) return 0; if (retryAttempts <= 0) return initialRetryBackoffMs; if (retryAttempts < 0) return initialRetryBackoffMs; long maxRetryTime = computeRetryWaitTimeInMillis(retryAttempts, initialRetryBackoffMs); return ThreadLocalRandom.current().nextLong(initialRetryBackoffMs, maxRetryTime); return ThreadLocalRandom.current().nextLong(0, maxRetryTime); } /** * Compute the time to sleep using exponential backoff. This method computes the normal exponential backoff * as {@code initialRetryBackoffMs << retryAttempt}. bounded to always be less than {@link #MAX_RETRY_TIME_MS}. * as {@code initialRetryBackoffMs << retryAttempt}, bounded to always be less than {@link #MAX_RETRY_TIME_MS}. * * @param retryAttempts the number of previous retry attempts; must be non-negative * @param initialRetryBackoffMs the initial time to wait before retrying; assumed to be 0 if value is negative Loading
src/test/java/io/confluent/connect/elasticsearch/RetryUtilTest.java +11 −7 Original line number Diff line number Diff line Loading @@ -46,13 +46,17 @@ public class RetryUtilTest { assertEquals(3200L, RetryUtil.computeRetryWaitTimeInMillis(5, 100L)); } protected void assertComputeRetryInRange(int retryAttempts, long initialRetryBackoffMs) { protected void assertComputeRetryInRange(int retryAttempts, long retryBackoffMs) { for (int i = 0; i != 20; ++i) { for (int retries = 0; retries <= retryAttempts; ++retries) { long result = RetryUtil.computeRetryWaitTimeInMillis(retries, initialRetryBackoffMs); if (initialRetryBackoffMs < 0) { long maxResult = RetryUtil.computeRetryWaitTimeInMillis(retries, retryBackoffMs); long result = RetryUtil.computeRandomRetryWaitTimeInMillis(retries, retryBackoffMs); if (retryBackoffMs < 0) { assertEquals(0, result); } else { assertTrue(result >= initialRetryBackoffMs); assertTrue(result >= 0L); assertTrue(result <= maxResult); } } } } Loading