Commit 6990d14f authored by Randall Hauch's avatar Randall Hauch Committed by GitHub
Browse files

Merge pull request #116 from rhauch/cc-1059b

CC-1059 Changed ES connector to use exponential backoff with jitter
parents 1da19fb1 16fc108c
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -55,7 +55,7 @@ Connector
  * Importance: low

``retry.backoff.ms``
  How long to wait in milliseconds before attempting to retry a failed indexing request. This avoids retrying in a tight loop under failure scenarios.
  How long to wait in milliseconds before attempting to retry a the first failed indexing request. This connector uses exponential backoff with jitter, which means that upon additional failures, this connector may wait up to twice as long as the previous wait, up to the maximum number of retries. This avoids retrying in a tight loop under failure scenarios.

  * Type: long
  * Default: 100
+58 −0
Original line number Diff line number Diff line
@@ -241,6 +241,64 @@ The following change is not allowed:

As mappings are more flexible, schema compatibility should be enforced when writing data to Kafka.

Automatic Retries
-----------------
The Elasticsearch connector may experience problems writing to the Elasticsearch endpoint, such as when
the Elasticsearch service is temporarily overloaded. In many cases, the connector will retry the request
a number of times before failing. To prevent from further overloading the Elasticsearch service, the connector
uses an exponential backoff technique to give the Elasticsearch service time to recover. The technique
adds randomness, called jitter, to the calculated backoff times to prevent a thundering herd, where large
numbers of requests from many tasks are submitted concurrently and overwhelm the service. Randomness spreads out
the retries from many tasks and should reduce the overall time required to complete all outstanding requests
compared to simple exponential backoff. The goal is to spread out the requests to Elasticsearch as much as
possible.

The number of retries is dictated by the ``max.retries`` connector configuration property, which defaults
to 5 attempts. The backoff time, which is the amount of time to wait before retrying, is a function of the
retry attempt number and the initial backoff time specified in the ``retry.backoff.ms`` connector configuration
property, which defaults to 500 milliseconds. For example, the following table shows the possible wait times
before submitting each of the 5 retry attempts:

.. table:: Range of backoff times for each retry using the default configuration
   :widths: auto

   =====  =====================  =====================  ==============================================
   Retry  Minimum Backoff (sec)  Maximum Backoff (sec)  Total Potential Delay from First Attempt (sec)
   =====  =====================  =====================  ==============================================
     1         0.0                      0.5                              0.5
     2         0.0                      1.0                              1.5
     3         0.0                      2.0                              3.5
     4         0.0                      4.0                              7.5
     5         0.0                      8.0                             15.5
   =====  =====================  =====================  ==============================================

Note how the maximum wait time is simply the normal exponential backoff, calculated as ``${retry.backoff.ms} * 2 ^ (retry-1)``.
Increasing the maximum number of retries adds more backoff:

.. table:: Range of backoff times for additional retries
   :widths: auto

   =====  =====================  =====================  ==============================================
   Retry  Minimum Backoff (sec)  Maximum Backoff (sec)  Total Potential Delay from First Attempt (sec)
   =====  =====================  =====================  ==============================================
     6         0.0                     16.0                             31.5
     7         0.0                     32.0                             63.5
     8         0.0                     64.0                            127.5
     9         0.0                    128.0                            256.5
    10         0.0                    256.0                            511.5
    11         0.0                    512.0                           1023.5
    12         0.0                   1024.0                           2047.5
    13         0.0                   2048.0                           4095.5
   =====  =====================  =====================  ==============================================

By increasing ``max.retries`` to 10, the connector may take up to 511.5 seconds, or a little over 8.5 minutes,
to successfully send a batch of records when experiencing an overloaded Elasticsearch service. Increasing the value
to 13 quickly increases the maximum potential time to submit a batch of records to well over 1 hour 8 minutes.

You can adjust both the ``max.retries`` and ``retry.backoff.ms`` connector configuration properties to achieve
the desired backoff and retry characteristics.


Reindexing
----------
In some cases, the way to index a set of documents may need to be changed. For example, the analyzer,
+2 −1
Original line number Diff line number Diff line
@@ -78,7 +78,8 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
                  + "If the retry attempts are exhausted the task will fail.",
                  group, ++order, Width.SHORT, "Max Retries")
          .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, Importance.LOW,
                  "How long to wait in milliseconds before attempting to retry a failed indexing request. "
                  "How long to wait in milliseconds before attempting the first retry of a failed indexing request. "
                  + "Upon a failure, this connector may wait up to twice as long as the previous wait, up to the maximum number of retries. "
                  + "This avoids retrying in a tight loop under failure scenarios.",
                  group, ++order, Width.SHORT, "Retry Backoff (ms)");
    }
+11 −0
Original line number Diff line number Diff line
@@ -31,6 +31,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
@@ -74,6 +75,16 @@ public class ElasticsearchSinkTask extends SinkTask {
      long retryBackoffMs = config.getLong(ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG);
      int maxRetry = config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG);

      // Calculate the maximum possible backoff time ...
      long maxRetryBackoffMs = RetryUtil.computeRetryWaitTimeInMillis(maxRetry, retryBackoffMs);
      if (maxRetryBackoffMs > RetryUtil.MAX_RETRY_TIME_MS) {
        log.warn("This connector uses exponential backoff with jitter for retries, and using '{}={}' and '{}={}' " +
                "results in an impractical but possible maximum backoff time greater than {} hours.",
                ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG, maxRetry,
                ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs,
                TimeUnit.MILLISECONDS.toHours(maxRetryBackoffMs));
      }

      if (client != null) {
        this.client = client;
      } else {
+72 −0
Original line number Diff line number Diff line
/**
 * Copyright 2017 Confluent Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 **/
package io.confluent.connect.elasticsearch;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

/**
 * Utility to compute the retry times for a given attempt, using exponential backoff.
 * <p>
 * The purposes of using exponential backoff is to give the ES service time to recover when it becomes overwhelmed.
 * Adding jitter attempts to prevent a thundering herd, where large numbers of requests from many tasks overwhelm the
 * ES service, and without randomization all tasks retry at the same time. Randomization should spread the retries
 * out and should reduce the overall time required to complete all attempts.
 * See <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">this blog post</a> for details.
 */
public class RetryUtil {

  /**
   * An arbitrary absolute maximum practical retry time.
   */
  public static final long MAX_RETRY_TIME_MS = TimeUnit.HOURS.toMillis(24);

  /**
   * Compute the time to sleep using exponential backoff with jitter. This method computes the normal exponential backoff
   * as {@code initialRetryBackoffMs << retryAttempt}, and then chooses a random value between 0 and that value.
   *
   * @param retryAttempts the number of previous retry attempts; must be non-negative
   * @param initialRetryBackoffMs the initial time to wait before retrying; assumed to be 0 if value is negative
   * @return the non-negative time in milliseconds to wait before the next retry attempt, or 0 if {@code initialRetryBackoffMs} is negative
   */
  public static long computeRandomRetryWaitTimeInMillis(int retryAttempts, long initialRetryBackoffMs) {
    if (initialRetryBackoffMs < 0) return 0;
    if (retryAttempts < 0) return initialRetryBackoffMs;
    long maxRetryTime = computeRetryWaitTimeInMillis(retryAttempts, initialRetryBackoffMs);
    return ThreadLocalRandom.current().nextLong(0, maxRetryTime);
  }

  /**
   * Compute the time to sleep using exponential backoff. This method computes the normal exponential backoff
   * as {@code initialRetryBackoffMs << retryAttempt}, bounded to always be less than {@link #MAX_RETRY_TIME_MS}.
   *
   * @param retryAttempts the number of previous retry attempts; must be non-negative
   * @param initialRetryBackoffMs the initial time to wait before retrying; assumed to be 0 if value is negative
   * @return the non-negative time in milliseconds to wait before the next retry attempt, or 0 if {@code initialRetryBackoffMs} is negative
   */
  public static long computeRetryWaitTimeInMillis(int retryAttempts, long initialRetryBackoffMs) {
    if (initialRetryBackoffMs < 0) return 0;
    if (retryAttempts <= 0) return initialRetryBackoffMs;
    if (retryAttempts > 32) {
      // This would overflow the exponential algorithm ...
      return MAX_RETRY_TIME_MS;
    }
    long result = initialRetryBackoffMs << retryAttempts;
    return result < 0L ? MAX_RETRY_TIME_MS : Math.min(MAX_RETRY_TIME_MS, result);
  }


}
Loading