Commit 07dd1c4a authored by Liquan Pei's avatar Liquan Pei
Browse files

Add configuration options

parent 03c67871
Loading
Loading
Loading
Loading
+97 −0
Original line number Diff line number Diff line
Configuration Options
---------------------
``connection.url``
  The URL to connect to Elasticsearch.

  * Type: string
  * Default: ""
  * Importance: high

``type.name``
  The type to use for each index.

  * Type: string
  * Default: ""
  * Importance: high

``key.ignore``
  Whether to ignore the key during indexing. When this is set to true, only the value from the message will be written to Elasticsearch.Note that this is a global config that applies to all topics. If this is set to true, Use ``topic.key.ignore`` to config for different topics. This value will be overridden by the per topic configuration.

  * Type: boolean
  * Default: false
  * Importance: high

``batch.size``
  The number of requests to process as a batch when writing to Elasticsearch.

  * Type: int
  * Default: 10000
  * Importance: medium

``max.in.flight.requests``
  The maximum number of incomplete batches each task will send before blocking. Note that if this is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries

  * Type: int
  * Default: 5
  * Importance: medium

``flush.timeout.ms``
  The timeout when flushing data to Elasticsearch.

  * Type: long
  * Default: 10000
  * Importance: low

``linger.ms``
  The task groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the tasks may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay. Rather than immediately sending out a record the task will wait for up to the given delay to allow other records to be sent so that the sends can be batched together.

  * Type: long
  * Default: 1
  * Importance: low

``max.buffered.records``
  Approximately the max number of records each task will buffer. This config controls the memory usage for each task. When the number of buffered records is larger than this value, the partitions assigned to this task will be paused.

  * Type: long
  * Default: 100000
  * Importance: low

``max.retry``
  The max allowed number of retries. Allowing retries will potentially change the ordering of records.

  * Type: int
  * Default: 5
  * Importance: low

``retry.backoff.ms``
  The amount of time to wait before attempting to retry a failed batch. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

  * Type: long
  * Default: 100
  * Importance: low

``schema.ignore``
  Whether to ignore schemas during indexing. When this is set to true, the schema in ``SinkRecord`` will be ignored and Elasticsearch will infer the mapping from data. Note that this is a global config that applies to all topics.Use ``topic.schema.ignore`` to config for different topics. This value will be overridden by the per topic configuration.

  * Type: boolean
  * Default: false
  * Importance: low

``topic.index.map``
  The map between Kafka topics and Elasticsearch indices.

  * Type: list
  * Default: []
  * Importance: low

``topic.key.ignore``
  A list of topics to ignore key when indexing. In case that the key for a topic can be null, you should include the topic in this config in order to generate a valid document id.

  * Type: list
  * Default: []
  * Importance: low

``topic.schema.ignore``
  A list of topics to ignore schema.

  * Type: list
  * Default: []
  * Importance: low
 No newline at end of file
+4 −4
Original line number Diff line number Diff line
@@ -9,9 +9,9 @@ Elasticsearch is often used for text queries, analytics and as an key-value stor
(`use cases <https://www.elastic.co/blog/found-uses-of-elasticsearch>`_). The connector covers
both the analytics and key-value store use cases. For the analytics use case,
each message is in Kafka is treated as an event and the connector uses ``topic+partition+offset``
as unique identifiers for events, which then converted to unique documents in Elasticsearch.
as a unique identifier for events, which then converted to unique documents in Elasticsearch.
For the key-value store use case, it supports using keys from Kafka messages as document ids in
Elasticsearch and provides configurations ensuring that updates to a key is written to Elasticsearch
Elasticsearch and provides configurations ensuring that updates to a key are written to Elasticsearch
in order. For both use cases, Elasticsearch's idempotent write semantics guarantees exactly once
delivery.

@@ -19,7 +19,7 @@ delivery.
process of defining how a document, and the fields it contains, are stored and indexed. Users can
explicitly define mappings for types in indices. When mapping is not explicitly defined,
Elasticsearch can determine field names and types from data, however, some types such as timestamp
and decimal may not be correctly inferred. To ensure that the types are correctly inferred, the
and decimal, may not be correctly inferred. To ensure that the types are correctly inferred, the
connector provides a feature to infer mapping from the schemas of Kafka messages.

Quickstart
@@ -171,5 +171,5 @@ connector jobs to achieve double writes:
   1. The connector job that ingest data to the old indices continue writing to the old indices.
   2. Create a new connector job that writes to new indices. This will copy both some old data and
      new data to the new indices as long as the data is in Kafka.
   4. Once the data in the old indices are moved to the new indices by the reindexing process, we
   3. Once the data in the old indices are moved to the new indices by the reindexing process, we
      can stop the old connector job.
+4 −4
Original line number Diff line number Diff line
@@ -41,11 +41,11 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
  private static final String KEY_IGNORE_DOC =
      "Whether to ignore the key during indexing. When this is set to true, only the value from the message will be written to Elasticsearch."
      + "Note that this is a global config that applies to all topics. If this is set to true, "
      + "Use `topic.key.ignore` to config for different topics. This value will be overridden by the per topic configuration.";
      + "Use ``topic.key.ignore`` to config for different topics. This value will be overridden by the per topic configuration.";
  private static final boolean KEY_IGNORE_DEFAULT = false;
  private static final String KEY_IGNORE_DISPLAY = "Ignore Key";

  // TODO: remove thid config when single message transform is in
  // TODO: remove this config when single message transform is in
  public static final String TOPIC_INDEX_MAP_CONFIG = "topic.index.map";
  private static final String TOPIC_INDEX_MAP_DOC = "The map between Kafka topics and Elasticsearch indices.";
  private static final String TOPIC_INDEX_MAP_DEFAULT = "";
@@ -106,9 +106,9 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {

  public static final String SCHEMA_IGNORE_CONFIG = "schema.ignore";
  private static final String SCHEMA_IGNORE_DOC =
      "Whether to ignore schemas during indexing. When this is set to true, the schema in `SinkRecord` will be ignored and Elasticsearch will infer the mapping from data. "
      "Whether to ignore schemas during indexing. When this is set to true, the schema in ``SinkRecord`` will be ignored and Elasticsearch will infer the mapping from data. "
      + "Note that this is a global config that applies to all topics."
      + "Use `topic.schema.ignore` to config for different topics. This value will be overridden by the per topic configuration.";
      + "Use ``topic.schema.ignore`` to config for different topics. This value will be overridden by the per topic configuration.";
  private static final boolean SCHEMA_IGNORE_DEFAULT = false;
  private static final String SCHEMA_IGNORE_DISPLAY = "Ignore Schema";