Commit c3dc37f0 authored by Randall Hauch's avatar Randall Hauch
Browse files

CC-1059 Changed ES connector to use exponential backoff with jitter

With lots of ES connector tasks all hitting the ES backend, when the ES backend becomes overloaded all of the tasks will experience timeouts (possibly at nearly the same time) and thus retry. Prior to this change, all tasks would use the same constant backoff time and would thus all retry at about the same point in time and possibly overwhelming the ES backend. This is known as a thundering herd, and when many attempts fail it takes a long time and many attempts to recover.

A solution to this problem is to use expontential backoff to give the ES backend time to recover, except that this alone doesn’t really reduce the thundering herd problem. To solve both problems we use expontential backoff but with jitter, which is a randomization of the sleep times for each of the attempts. This PR adds exponential backoff with jitter.

This new algorithm computes the normal maximum time to wait for a particular retry attempt using exponential backoff and then choosing a random value larger than the `retry.backoff.ms` initial backoff value and that maximum time.

Since this exponential algorithm breaks down after a large number of retry attempts, rather than adding a constraint for `max.retries` this change simply uses a practical (and arbitrary) absolute upper limit on the backoff time of 24 hours, and it logs a warning if this upper limit is exceeded.
parent 1da19fb1
Loading
Loading
Loading
Loading
+11 −0
Original line number Diff line number Diff line
@@ -31,6 +31,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
@@ -74,6 +75,16 @@ public class ElasticsearchSinkTask extends SinkTask {
      long retryBackoffMs = config.getLong(ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG);
      int maxRetry = config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG);

      // Calculate the maximum possible backoff time ...
      long maxRetryBackoffMs = RetryUtil.computeRetryWaitTimeInMillis(maxRetry, retryBackoffMs);
      if (maxRetryBackoffMs > RetryUtil.MAX_RETRY_TIME_MS) {
        log.warn("This connector uses exponential backoff with jitter for retries, and using '{}={}' and '{}={}' " +
                "results in an impractical but possible maximum backoff time greater than {} hours.",
                ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG, maxRetry,
                ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs,
                TimeUnit.MILLISECONDS.toHours(maxRetryBackoffMs));
      }

      if (client != null) {
        this.client = client;
      } else {
+73 −0
Original line number Diff line number Diff line
/**
 * Copyright 2017 Confluent Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 **/
package io.confluent.connect.elasticsearch;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

/**
 * Utility to compute the retry times for a given attempt, using exponential backoff.
 * <p>
 * The purposes of using exponential backoff is to give the ES service time to recover when it becomes overwhelmed.
 * Adding jitter attempts to prevent a thundering herd, where large numbers of requests from many tasks overwhelm the
 * ES service, and without randomization all tasks retry at the same time. Randomization should spread the retries
 * out and should reduce the overall time required to complete all attempts.
 * See <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">this blog post</a> for details.
 */
public class RetryUtil {

  /**
   * An arbitrary absolute maximum practical retry time.
   */
  public static final long MAX_RETRY_TIME_MS = TimeUnit.HOURS.toMillis(24);

  /**
   * Compute the time to sleep using exponential backoff with jitter. This method computes the normal exponential backoff
   * as {@code initialRetryBackoffMs << retryAttempt}, and then chooses a random value between {@code initialRetryBackoffMs}
   * and that value.
   *
   * @param retryAttempts the number of previous retry attempts; must be non-negative
   * @param initialRetryBackoffMs the initial time to wait before retrying; assumed to be 0 if value is negative
   * @return the non-negative time in milliseconds to wait before the next retry attempt, or 0 if {@code initialRetryBackoffMs} is negative
   */
  public static long computeRandomRetryWaitTimeInMillis(int retryAttempts, long initialRetryBackoffMs) {
    if (initialRetryBackoffMs < 0) return 0;
    if (retryAttempts <= 0) return initialRetryBackoffMs;
    long maxRetryTime = computeRetryWaitTimeInMillis(retryAttempts, initialRetryBackoffMs);
    return ThreadLocalRandom.current().nextLong(initialRetryBackoffMs, maxRetryTime);
  }

  /**
   * Compute the time to sleep using exponential backoff. This method computes the normal exponential backoff
   * as {@code initialRetryBackoffMs << retryAttempt}. bounded to always be less than {@link #MAX_RETRY_TIME_MS}.
   *
   * @param retryAttempts the number of previous retry attempts; must be non-negative
   * @param initialRetryBackoffMs the initial time to wait before retrying; assumed to be 0 if value is negative
   * @return the non-negative time in milliseconds to wait before the next retry attempt, or 0 if {@code initialRetryBackoffMs} is negative
   */
  public static long computeRetryWaitTimeInMillis(int retryAttempts, long initialRetryBackoffMs) {
    if (initialRetryBackoffMs < 0) return 0;
    if (retryAttempts <= 0) return initialRetryBackoffMs;
    if (retryAttempts > 32) {
      // This would overflow the exponential algorithm ...
      return MAX_RETRY_TIME_MS;
    }
    long result = initialRetryBackoffMs << retryAttempts;
    return result < 0L ? MAX_RETRY_TIME_MS : Math.min(MAX_RETRY_TIME_MS, result);
  }


}
+14 −7
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@
 **/
package io.confluent.connect.elasticsearch.bulk;

import io.confluent.connect.elasticsearch.RetryUtil;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
@@ -341,28 +342,34 @@ public class BulkProcessor<R, B> {
        log.error("Failed to create bulk request from batch {} of {} records", batchId, batch.size(), e);
        throw e;
      }
      for (int remainingRetries = maxRetries; true; remainingRetries--) {
      final int maxAttempts = maxRetries + 1;
      for (int attempts = 1, retryAttempts = 0; true; ++attempts, ++retryAttempts) {
        boolean retriable = true;
        try {
          log.trace("Executing batch {} of {} records", batchId, batch.size());
          log.trace("Executing batch {} of {} records with attempt {}/{}", batchId, batch.size(), attempts, maxAttempts);
          final BulkResponse bulkRsp = bulkClient.execute(bulkReq);
          if (bulkRsp.isSucceeded()) {
            if (attempts > 1) {
              // We only logged failures, so log the success immediately after a failure ...
              log.debug("Completed batch {} of {} records with attempt {}/{}", batchId, batch.size(), attempts, maxAttempts);
            }
            return bulkRsp;
          }
          retriable = bulkRsp.isRetriable();
          throw new ConnectException("Bulk request failed: " + bulkRsp.getErrorInfo());
        } catch (Exception e) {
          if (retriable && remainingRetries > 0) {
            log.warn("Failed to execute batch {} of {} records, retrying after {} ms", batchId, batch.size(), retryBackoffMs, e);
            time.sleep(retryBackoffMs);
          if (retriable && attempts < maxAttempts) {
            long sleepTimeMs = RetryUtil.computeRandomRetryWaitTimeInMillis(retryAttempts, retryBackoffMs);
            log.warn("Failed to execute batch {} of {} records with attempt {}/{}, will attempt retry after {} ms. Failure reason: {}",
                      batchId, batch.size(), attempts, maxAttempts, sleepTimeMs, e.getMessage());
            time.sleep(sleepTimeMs);
          } else {
            log.error("Failed to execute batch {} of {} records", batchId, batch.size(), e);
            log.error("Failed to execute batch {} of {} records after total of {} attempt(s)", batchId, batch.size(), attempts, e);
            throw e;
          }
        }
      }
    }

  }

  private synchronized void onBatchCompletion(int batchSize) {
+59 −0
Original line number Diff line number Diff line
/**
 * Copyright 2017 Confluent Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 **/
package io.confluent.connect.elasticsearch;

import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class RetryUtilTest {
  @Test
  public void computeRetryBackoffForValidRanges() {
    assertComputeRetryInRange(10, 10L);
    assertComputeRetryInRange(10, 100L);
    assertComputeRetryInRange(10, 1000L);
    assertComputeRetryInRange(100, 1000L);
  }

  @Test
  public void computeRetryBackoffForNegativeRetryTimes() {
    assertComputeRetryInRange(1, -100L);
    assertComputeRetryInRange(10, -100L);
    assertComputeRetryInRange(100, -100L);
  }

  @Test
  public void computeNonRandomRetryTimes() {
    assertEquals(100L, RetryUtil.computeRetryWaitTimeInMillis(0, 100L));
    assertEquals(200L, RetryUtil.computeRetryWaitTimeInMillis(1, 100L));
    assertEquals(400L, RetryUtil.computeRetryWaitTimeInMillis(2, 100L));
    assertEquals(800L, RetryUtil.computeRetryWaitTimeInMillis(3, 100L));
    assertEquals(1600L, RetryUtil.computeRetryWaitTimeInMillis(4, 100L));
    assertEquals(3200L, RetryUtil.computeRetryWaitTimeInMillis(5, 100L));
  }

  protected void assertComputeRetryInRange(int retryAttempts, long initialRetryBackoffMs) {
    for (int retries = 0; retries <= retryAttempts; ++retries) {
      long result = RetryUtil.computeRetryWaitTimeInMillis(retries, initialRetryBackoffMs);
      if (initialRetryBackoffMs < 0) {
        assertEquals(0, result);
      } else {
        assertTrue(result >= initialRetryBackoffMs);
      }
    }
  }
}
 No newline at end of file
+37 −0
Original line number Diff line number Diff line
@@ -233,6 +233,43 @@ public class BulkProcessorTest {
    assertTrue(bulkProcessor.submitBatchWhenReady().get().succeeded);
  }

  @Test
  public void retriableErrorsHitMaxRetries() throws InterruptedException, ExecutionException {
    final int maxBufferedRecords = 100;
    final int maxInFlightBatches = 5;
    final int batchSize = 2;
    final int lingerMs = 5;
    final int maxRetries = 2;
    final int retryBackoffMs = 1;
    final String errorInfo = "a final retriable error again";

    client.expect(Arrays.asList(42, 43), BulkResponse.failure(true, "a retiable error"));
    client.expect(Arrays.asList(42, 43), BulkResponse.failure(true, "a retriable error again"));
    client.expect(Arrays.asList(42, 43), BulkResponse.failure(true, errorInfo));

    final BulkProcessor<Integer, ?> bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
        client,
        maxBufferedRecords,
        maxInFlightBatches,
        batchSize,
        lingerMs,
        maxRetries,
        retryBackoffMs
    );

    final int addTimeoutMs = 10;
    bulkProcessor.add(42, addTimeoutMs);
    bulkProcessor.add(43, addTimeoutMs);

    try {
      bulkProcessor.submitBatchWhenReady().get();
      fail();
    } catch (ExecutionException e) {
      assertTrue(e.getCause().getMessage().contains(errorInfo));
    }
  }

  @Test
  public void unretriableErrors() throws InterruptedException {
    final int maxBufferedRecords = 100;