Commit f7f2e10c authored by k.privezentsev's avatar k.privezentsev
Browse files

move definition for "drop.invalid.message" config option in new place

parent 80141587
Loading
Loading
Loading
Loading
+16 −2
Original line number Diff line number Diff line
@@ -76,6 +76,8 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
  public static final String TOPIC_KEY_IGNORE_CONFIG = "topic.key.ignore";
  public static final String SCHEMA_IGNORE_CONFIG = "schema.ignore";
  public static final String TOPIC_SCHEMA_IGNORE_CONFIG = "topic.schema.ignore";
  public static final String DROP_INVALID_MESSAGE_CONFIG = "drop.invalid.message";

  private static final String KEY_IGNORE_DOC =
      "Whether to ignore the record key for the purpose of forming the Elasticsearch document ID."
      + " When this is set to ``true``, document IDs will be generated as the record's "
@@ -92,7 +94,8 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
      + TOPIC_SCHEMA_IGNORE_CONFIG + "`` to override as ``true`` for specific topics.";
  private static final String TOPIC_SCHEMA_IGNORE_DOC =
      "List of topics for which ``" + SCHEMA_IGNORE_CONFIG + "`` should be ``true``.";
  public static final String DROP_INVALID_MESSAGE = "drop.invalid.message";
  private static final String DROP_INVALID_MESSAGE_DOC =
          "Whether to drop kafka message when it cannot be converted to output message.";


  protected static ConfigDef baseConfigDef() {
@@ -249,7 +252,18 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
        ++order,
        Width.LONG,
        "Topics for 'Ignore Schema' mode"
      );
    ).define(
        DROP_INVALID_MESSAGE_CONFIG,
        Type.BOOLEAN,
        false,
        Importance.LOW,
        DROP_INVALID_MESSAGE_DOC,
        group,
        ++order,
        Width.LONG,
        "Drop invalid messages");


  }

  public static final ConfigDef CONFIG = baseConfigDef();
+1 −1
Original line number Diff line number Diff line
@@ -88,7 +88,7 @@ public class ElasticsearchSinkTask extends SinkTask {
      int maxRetry =
          config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG);
      boolean dropInvalidMessage =
          config.getBoolean(ElasticsearchSinkConnectorConfig.DROP_INVALID_MESSAGE);
          config.getBoolean(ElasticsearchSinkConnectorConfig.DROP_INVALID_MESSAGE_CONFIG);

      if (client != null) {
        this.client = client;