Commit 7d3a410f authored by Shikhar Bhushan's avatar Shikhar Bhushan
Browse files

CC-331: update config options docs

parent bd2768ed
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
target
docs/_build
.idea
*.iml
+51 −41
Original line number Diff line number Diff line
Configuration Options
---------------------

``connection.url``
  The URL to connect to Elasticsearch.
Connector
^^^^^^^^^

  * Type: string
  * Default: ""
  * Importance: high

``type.name``
  The type to use for each index.
``connection.url``
  Elasticsearch HTTP connection URL e.g. ``http://eshost:9200``.

  * Type: string
  * Default: ""
  * Importance: high

``key.ignore``
  Whether to ignore the key during indexing. When this is set to true, only the value from the message will be written to Elasticsearch. Note that this is a global config that applies to all topics. If this is set to true, Use ``topic.key.ignore`` to config for different topics. This value will be overridden by the per topic configuration.

  * Type: boolean
  * Default: false
  * Importance: high

``batch.size``
  The number of requests to process as a batch when writing to Elasticsearch.
  The number of records to process as a batch when writing to Elasticsearch.

  * Type: int
  * Default: 2000
  * Importance: medium

``max.in.flight.requests``
  The maximum number of incomplete batches each task will send before blocking.
``max.buffered.records``
  The maximum number of records each task will buffer before blocking acceptance of more records. This config can be used to limit the memory usage for each task.

  * Type: int
  * Default: 5
  * Importance: medium
  * Default: 20000
  * Importance: low

``flush.timeout.ms``
  The timeout when flushing data to Elasticsearch.
``linger.ms``
  Linger time in milliseconds for batching.

  Records that arrive in between request transmissions are batched into a single bulk indexing request, based on the ``batch.size`` configuration. Normally this only occurs under load when records arrive faster than they can be sent out. However it may be desirable to reduce the number of requests even under light load and benefit from bulk indexing. This setting helps accomplishes that - rather than immediately sending out a record, the task will wait upto the given delay to allow other records to be added so that they can be batched into a single request.

  * Type: long
  * Default: 10000
  * Default: 1
  * Importance: low

``linger.ms``
  The task groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the tasks may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay. Rather than immediately sending out a record the task will wait for up to the given delay to allow other records to be sent so that the sends can be batched together.
``flush.timeout.ms``
  The timeout in milliseconds when flushing records to Elasticsearch.

  * Type: long
  * Default: 1
  * Default: 10000
  * Importance: low

``max.buffered.records``
  Approximately the max number of records each task will buffer. This config controls the memory usage for each task.
``max.in.flight.requests``
  The maximum number of indexing requests that can be in-flight to Elasticsearch before blocking further requests.

  * Type: int
  * Default: 20000
  * Importance: low
  * Default: 5
  * Importance: medium

``max.retries``
  The max allowed number of retries.
  The maximum number of retries that are allowed.

  * Type: int
  * Default: 5
  * Importance: low

``retry.backoff.ms``
  The amount of time to wait before attempting to retry a failed batch. This avoids repeatedly sending requests in a tight loop under some failure scenarios.
  How long to wait in milliseconds, before attempting to retry a failed batch. This avoids retrying requests in a tight loop under certain failure scenarios.

  * Type: long
  * Default: 100
  * Importance: low

Data Conversion
^^^^^^^^^^^^^^^

``type.name``
  The Elasticsearch type name to use when indexing.

  * Type: string
  * Importance: high

``key.ignore``
  Whether to ignore the record key for the purpose of forming the Elasticsearch document ID. When this is set to ``true``, document IDs will be generated as the record's ``topic+partition+offset``.

  Note that this is a global config that applies to all topics, use ``topic.key.ignore`` to override as ``true`` for specific topics.

  * Type: boolean
  * Default: false
  * Importance: high

``schema.ignore``
  Whether to ignore schemas during indexing. When this is set to true, the schema in ``SinkRecord`` will be ignored and Elasticsearch will infer the mapping from data. Note that this is a global config that applies to all topics.Use ``topic.schema.ignore`` to config for different topics. This value will be overridden by the per topic configuration.
  Whether to ignore schemas during indexing. When this is set to ``true``, the record schema will be ignored for the purpose of registering an Elasticsearch mapping. Elasticsearch will infer the mapping from the data (dynamic mapping needs to be enabled by the user).

  Note that this is a global config that applies to all topics, use ``topic.schema.ignore`` to override as ``true`` for specific topics.

  * Type: boolean
  * Default: false
  * Importance: low

``topic.index.map``
  The map between Kafka topics and Elasticsearch indices.
  A map from Kafka topic name to the destination Elasticsearch index, represented as a list of ``topic:index`` pairs.

  * Type: list
  * Default: []
  * Default: ""
  * Importance: low

``topic.key.ignore``
  A list of topics to ignore key when indexing. In case that the key for a topic can be null, you should include the topic in this config in order to generate a valid document id.
  List of topics for which ``key.ignore`` should be ``true``.

  * Type: list
  * Default: []
  * Default: ""
  * Importance: low

``topic.schema.ignore``
  A list of topics to ignore schema.
  List of topics for which ``schema.ignore`` should be ``true``.

  * Type: list
  * Default: []
  * Default: ""
  * Importance: low
+1 −1
Original line number Diff line number Diff line
@@ -69,6 +69,6 @@ public class ElasticsearchSinkConnector extends SinkConnector {

  @Override
  public ConfigDef config() {
    return ElasticsearchSinkConnectorConfig.config;
    return ElasticsearchSinkConnectorConfig.CONFIG;
  }
}
+79 −104
Original line number Diff line number Diff line
@@ -26,123 +26,98 @@ import java.util.Map;

public class ElasticsearchSinkConnectorConfig extends AbstractConfig {

  private static final String ELASTICSEARCH_GROUP = "Elasticsearch";
  private static final String CONNECTOR_GROUP = "Connector";

  public static final String CONNECTION_URL_CONFIG = "connection.url";
  private static final String CONNECTION_URL_DOC = "The URL to connect to Elasticsearch.";
  private static final String CONNECTION_URL_DISPLAY = "Connection URL";

  public static final String TYPE_NAME_CONFIG = "type.name";
  private static final String TYPE_NAME_DOC = "The type to use for each index.";
  private static final String TYPE_NAME_DISPLAY = "Type Name";

  public static final String KEY_IGNORE_CONFIG = "key.ignore";
  private static final String KEY_IGNORE_DOC =
      "Whether to ignore the key during indexing. When this is set to true, only the value from the message will be written to Elasticsearch. "
      + "Note that this is a global config that applies to all topics. If this is set to true, "
      + "Use ``topic.key.ignore`` to config for different topics. This value will be overridden by the per topic configuration.";
  private static final boolean KEY_IGNORE_DEFAULT = false;
  private static final String KEY_IGNORE_DISPLAY = "Ignore Key";

  // TODO: remove this config when single message transform is in
  public static final String TOPIC_INDEX_MAP_CONFIG = "topic.index.map";
  private static final String TOPIC_INDEX_MAP_DOC = "The map between Kafka topics and Elasticsearch indices.";
  private static final String TOPIC_INDEX_MAP_DEFAULT = "";
  private static final String TOPIC_INDEX_MAP_DISPLAY = "Topic to Type";

  public static final String TOPIC_KEY_IGNORE_CONFIG = "topic.key.ignore";
  private static final String TOPIC_KEY_IGNORE_DOC =
      "A list of topics to ignore key when indexing. In case that the key for a topic can be null, you should include the topic in this config "
      + "in order to generate a valid document id.";
  private static final String TOPIC_KEY_IGNORE_DEFAULT = "";
  private static final String TOPIC_KEY_IGNORE_DISPLAY = "Topics to Ignore Key";

  public static final String FLUSH_TIMEOUT_MS_CONFIG = "flush.timeout.ms";
  private static final String FLUSH_TIMEOUT_MS_DOC = "The timeout when flushing data to Elasticsearch.";
  private static final long FLUSH_TIMEOUT_MS_DEFAULT = 10000;
  private static final String FLUSH_TIMEOUT_MS_DISPLAY = "Flush Timeout (ms)";

  public static final String MAX_BUFFERED_RECORDS_CONFIG = "max.buffered.records";
  private static final String MAX_BUFFERED_RECORDS_DOC =
      "Approximately the max number of records each task will buffer. This config controls the memory usage for each task.";
  private static final int MAX_BUFFERED_RECORDS_DEFAULT = 20000;
  private static final String MAX_BUFFERED_RECORDS_DISPLAY = "Max Number of Records to Buffer";

  public static final String BATCH_SIZE_CONFIG = "batch.size";
  private static final String BATCH_SIZE_DOC = "The number of requests to process as a batch when writing to Elasticsearch.";
  private static final int BATCH_SIZE_DEFAULT = 2000;
  private static final String BATCH_SIZE_DISPLAY = "Batch Size";

  public static final String LINGER_MS_CONFIG = "linger.ms";
  private static final String LINGER_MS_DOC =
      "The task groups together any records that arrive in between request transmissions into a single batched request. "
      + "Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the "
      + "tasks may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount "
      + "of artificial delay. Rather than immediately sending out a record the task will wait for up to the given delay to allow other "
      + "records to be sent so that the sends can be batched together.";
  private static final long LINGER_MS_DEFAULT = 1;
  private static final String LINGER_MS_DISPLAY = "Linger (ms)";

  public static final String MAX_IN_FLIGHT_REQUESTS_CONFIG = "max.in.flight.requests";
  private static final String MAX_IN_FLIGHT_REQUESTS_DOC =
      "The maximum number of incomplete batches each task will send before blocking.";
  private static final int MAX_IN_FLIGHT_REQUESTS_DEFAULT = 5;
  private static final String MAX_IN_FLIGHT_REQUESTS_DISPLAY = "Max in Flight Requests";

  public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
  private static final String RETRY_BACKOFF_MS_DOC =
      "The amount of time to wait before attempting to retry a failed batch. "
      + "This avoids repeatedly sending requests in a tight loop under some failure scenarios.";
  private static final long RETRY_BACKOFF_MS_DEFAULT = 100L;
  private static final String RETRY_BACKOFF_MS_DISPLAY = "Retry Backoff (ms)";

  public static final String MAX_BUFFERED_RECORDS_CONFIG = "max.buffered.records";
  public static final String LINGER_MS_CONFIG = "linger.ms";
  public static final String FLUSH_TIMEOUT_MS_CONFIG = "flush.timeout.ms";
  public static final String MAX_RETRIES_CONFIG = "max.retries";
  private static final String MAX_RETRIES_DOC = "The max allowed number of retries.";
  private static final int MAX_RETRIES_DEFAULT = 5;
  private static final String MAX_RETRIES_DISPLAY = "Max Retries";

  public static final String TYPE_NAME_CONFIG = "type.name";
  public static final String TOPIC_INDEX_MAP_CONFIG = "topic.index.map";
  public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
  public static final String KEY_IGNORE_CONFIG = "key.ignore";
  public static final String TOPIC_KEY_IGNORE_CONFIG = "topic.key.ignore";
  public static final String SCHEMA_IGNORE_CONFIG = "schema.ignore";
  private static final String SCHEMA_IGNORE_DOC =
      "Whether to ignore schemas during indexing. When this is set to true, the schema in ``SinkRecord`` will be ignored and Elasticsearch will infer the mapping from data. "
      + "Note that this is a global config that applies to all topics."
      + "Use ``topic.schema.ignore`` to config for different topics. This value will be overridden by the per topic configuration.";
  private static final boolean SCHEMA_IGNORE_DEFAULT = false;
  private static final String SCHEMA_IGNORE_DISPLAY = "Ignore Schema";

  public static final String TOPIC_SCHEMA_IGNORE_CONFIG = "topic.schema.ignore";
  private static final String TOPIC_SCHEMA_IGNORE_DOC = "A list of topics to ignore schema.";
  private static final String TOPIC_SCHEMA_IGNORE_DEFAULT = "";
  private static final String TOPIC_SCHEMA_IGNORE_DISPLAY = "Topics to Ignore Schema";

  public static ConfigDef baseConfigDef() {
    return new ConfigDef()
        .define(CONNECTION_URL_CONFIG, Type.STRING, Importance.HIGH, CONNECTION_URL_DOC, ELASTICSEARCH_GROUP, 1, Width.LONG,
                CONNECTION_URL_DISPLAY)
        .define(TYPE_NAME_CONFIG, Type.STRING, Importance.HIGH, TYPE_NAME_DOC, ELASTICSEARCH_GROUP, 2, Width.SHORT, TYPE_NAME_DISPLAY)
        .define(KEY_IGNORE_CONFIG, Type.BOOLEAN, KEY_IGNORE_DEFAULT, Importance.HIGH, KEY_IGNORE_DOC, CONNECTOR_GROUP, 3, Width.SHORT, KEY_IGNORE_DISPLAY)
        .define(BATCH_SIZE_CONFIG, Type.INT, BATCH_SIZE_DEFAULT, Importance.MEDIUM, BATCH_SIZE_DOC, CONNECTOR_GROUP, 4, Width.SHORT, BATCH_SIZE_DISPLAY)
        .define(MAX_IN_FLIGHT_REQUESTS_CONFIG, Type.INT, MAX_IN_FLIGHT_REQUESTS_DEFAULT, Importance.MEDIUM,
                MAX_IN_FLIGHT_REQUESTS_DOC, CONNECTOR_GROUP, 5, Width.SHORT,
                MAX_IN_FLIGHT_REQUESTS_DISPLAY)
        .define(TOPIC_INDEX_MAP_CONFIG, Type.LIST, TOPIC_INDEX_MAP_DEFAULT, Importance.LOW, TOPIC_INDEX_MAP_DOC, CONNECTOR_GROUP, 6, Width.LONG, TOPIC_INDEX_MAP_DISPLAY)
        .define(TOPIC_KEY_IGNORE_CONFIG, Type.LIST, TOPIC_KEY_IGNORE_DEFAULT, Importance.LOW, TOPIC_KEY_IGNORE_DOC, CONNECTOR_GROUP, 7, Width.LONG, TOPIC_KEY_IGNORE_DISPLAY)
        .define(SCHEMA_IGNORE_CONFIG, Type.BOOLEAN, SCHEMA_IGNORE_DEFAULT, Importance.LOW, SCHEMA_IGNORE_DOC, CONNECTOR_GROUP, 8, Width.SHORT, SCHEMA_IGNORE_DISPLAY)
        .define(TOPIC_SCHEMA_IGNORE_CONFIG, Type.LIST, TOPIC_SCHEMA_IGNORE_DEFAULT, Importance.LOW, TOPIC_SCHEMA_IGNORE_DOC, CONNECTOR_GROUP, 9, Width.LONG, TOPIC_SCHEMA_IGNORE_DISPLAY)
        .define(LINGER_MS_CONFIG, Type.LONG, LINGER_MS_DEFAULT, Importance.LOW, LINGER_MS_DOC, CONNECTOR_GROUP, 10, Width.SHORT, LINGER_MS_DISPLAY)
        .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, RETRY_BACKOFF_MS_DEFAULT, Importance.LOW, RETRY_BACKOFF_MS_DOC, CONNECTOR_GROUP, 11, Width.SHORT, RETRY_BACKOFF_MS_DISPLAY)
        .define(MAX_RETRIES_CONFIG, Type.INT, MAX_RETRIES_DEFAULT, Importance.LOW, MAX_RETRIES_DOC, CONNECTOR_GROUP, 12, Width.SHORT, MAX_RETRIES_DISPLAY)
        .define(FLUSH_TIMEOUT_MS_CONFIG, Type.LONG, FLUSH_TIMEOUT_MS_DEFAULT, Importance.LOW, FLUSH_TIMEOUT_MS_DOC, CONNECTOR_GROUP, 13, Width.SHORT, FLUSH_TIMEOUT_MS_DISPLAY)
        .define(MAX_BUFFERED_RECORDS_CONFIG, Type.INT, MAX_BUFFERED_RECORDS_DEFAULT, Importance.LOW, MAX_BUFFERED_RECORDS_DOC, CONNECTOR_GROUP, 14, Width.SHORT, MAX_BUFFERED_RECORDS_DISPLAY);
  protected static ConfigDef baseConfigDef() {
    final ConfigDef configDef = new ConfigDef();

    {
      final String group = "Connector";
      int order = 0;
      configDef
          .define(CONNECTION_URL_CONFIG, Type.STRING, Importance.HIGH,
                  "Elasticsearch HTTP connection URL e.g. ``http://eshost:9200``.",
                  group, ++order, Width.LONG, "Connection URL")
          .define(BATCH_SIZE_CONFIG, Type.INT, 2000, Importance.MEDIUM,
                  "The number of records to process as a batch when writing to Elasticsearch.",
                  group, ++order, Width.SHORT, "Batch Size")
          .define(MAX_IN_FLIGHT_REQUESTS_CONFIG, Type.INT, 5, Importance.MEDIUM,
                  "The maximum number of indexing requests that can be in-flight to Elasticsearch before blocking further requests.",
                  group, 5, Width.SHORT, "Max In-flight Requests")
          .define(MAX_BUFFERED_RECORDS_CONFIG, Type.INT, 20000, Importance.LOW,
                  "The maximum number of records each task will buffer before blocking acceptance of more records. This config can be used to limit the memory usage for each task.",
                  group, ++order, Width.SHORT, "Max Buffered Records")
          .define(LINGER_MS_CONFIG, Type.LONG, 1L, Importance.LOW,
                  "Linger time in milliseconds for batching.\n"
                  + "Records that arrive in between request transmissions are batched into a single bulk indexing request, based on the ``" + BATCH_SIZE_CONFIG + "`` configuration. "
                  + "Normally this only occurs under load when records arrive faster than they can be sent out. "
                  + "However it may be desirable to reduce the number of requests even under light load and benefit from bulk indexing. "
                  + "This setting helps accomplishes that - rather than immediately sending out a record, "
                  + "the task will wait upto the given delay to allow other records to be added so that they can be batched into a single request.",
                  group, ++order, Width.SHORT, "Linger (ms)")
          .define(FLUSH_TIMEOUT_MS_CONFIG, Type.LONG, 10000L, Importance.LOW,
                  "The timeout in milliseconds when flushing records to Elasticsearch.",
                  group, ++order, Width.SHORT, "Flush Timeout (ms)")
          .define(MAX_RETRIES_CONFIG, Type.INT, 5, Importance.LOW,
                  "The maximum number of retries that are allowed.",
                  group, ++order, Width.SHORT, "Max Retries")
          .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, Importance.LOW,
                  "How long to wait in milliseconds, before attempting to retry a failed batch. This avoids retrying requests in a tight loop under certain failure scenarios.",
                  group, ++order, Width.SHORT, "Retry Backoff (ms)");
    }

    {
      final String group = "Data Conversion";
      int order = 0;
      configDef
          .define(TYPE_NAME_CONFIG, Type.STRING, Importance.HIGH,
                  "The Elasticsearch type name to use when indexing.",
                  group, ++order, Width.SHORT, "Type Name")
          .define(KEY_IGNORE_CONFIG, Type.BOOLEAN, false, Importance.HIGH,
                  "Whether to ignore the record key for the purpose of forming the Elasticsearch document ID. "
                  + "When this is set to ``true``, document IDs will be generated as the record's ``topic+partition+offset``.\n"
                  + "Note that this is a global config that applies to all topics, use ``" + TOPIC_KEY_IGNORE_CONFIG + "`` to override as ``true`` for specific topics.",
                  group, ++order, Width.SHORT, "Ignore Key mode")
          .define(SCHEMA_IGNORE_CONFIG, Type.BOOLEAN, false, Importance.LOW,
                  "Whether to ignore schemas during indexing. "
                  + "When this is set to ``true``, the record schema will be ignored for the purpose of registering an Elasticsearch mapping. "
                  + "Elasticsearch will infer the mapping from the data (dynamic mapping needs to be enabled by the user).\n"
                  + "Note that this is a global config that applies to all topics, use ``" + TOPIC_SCHEMA_IGNORE_CONFIG + "`` to override as ``true`` for specific topics.",
                  group, ++order, Width.SHORT, "Ignore Schema mode")
          .define(TOPIC_INDEX_MAP_CONFIG, Type.LIST, "", Importance.LOW,
                  "A map from Kafka topic name to the destination Elasticsearch index, represented as a list of ``topic:index`` pairs.",
                  group, ++order, Width.LONG, "Topic to Index Map")
          .define(TOPIC_KEY_IGNORE_CONFIG, Type.LIST, "", Importance.LOW,
                  "List of topics for which ``" + KEY_IGNORE_CONFIG + "`` should be ``true``.",
                  group, ++order, Width.LONG, "Topics for 'Ignore Key' mode")
          .define(TOPIC_SCHEMA_IGNORE_CONFIG, Type.LIST, "", Importance.LOW,
                  "List of topics for which ``" + SCHEMA_IGNORE_CONFIG + "`` should be ``true``.",
                  group, ++order, Width.LONG, "Topics for 'Ignore Schema' mode");
    }

    return configDef;
  }

  static ConfigDef config = baseConfigDef();
  public static final ConfigDef CONFIG = baseConfigDef();

  public ElasticsearchSinkConnectorConfig(Map<String, String> props) {
    super(config, props);
    super(CONFIG, props);
  }

  public static void main(String[] args) {
    System.out.println(config.toRst());
    System.out.println(CONFIG.toRst());
  }
}