Commit 80141587 authored by k.privezentsev's avatar k.privezentsev
Browse files

added option to drop invalid message

parent f3068c9a
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -92,6 +92,8 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
      + TOPIC_SCHEMA_IGNORE_CONFIG + "`` to override as ``true`` for specific topics.";
  private static final String TOPIC_SCHEMA_IGNORE_DOC =
      "List of topics for which ``" + SCHEMA_IGNORE_CONFIG + "`` should be ``true``.";
  public static final String DROP_INVALID_MESSAGE = "drop.invalid.message";


  protected static ConfigDef baseConfigDef() {
    final ConfigDef configDef = new ConfigDef();
+5 −1
Original line number Diff line number Diff line
@@ -64,6 +64,7 @@ public class ElasticsearchSinkTask extends SinkTask {
      boolean ignoreSchema =
          config.getBoolean(ElasticsearchSinkConnectorConfig.SCHEMA_IGNORE_CONFIG);


      Map<String, String> topicToIndexMap =
          parseMapConfig(config.getList(ElasticsearchSinkConnectorConfig.TOPIC_INDEX_MAP_CONFIG));
      Set<String> topicIgnoreKey =
@@ -86,6 +87,8 @@ public class ElasticsearchSinkTask extends SinkTask {
          config.getLong(ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG);
      int maxRetry =
          config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG);
      boolean dropInvalidMessage =
          config.getBoolean(ElasticsearchSinkConnectorConfig.DROP_INVALID_MESSAGE);

      if (client != null) {
        this.client = client;
@@ -112,7 +115,8 @@ public class ElasticsearchSinkTask extends SinkTask {
          .setBatchSize(batchSize)
          .setLingerMs(lingerMs)
          .setRetryBackoffMs(retryBackoffMs)
          .setMaxRetry(maxRetry);
          .setMaxRetry(maxRetry)
          .setDropInvalidMessage(dropInvalidMessage);

      writer = builder.build();
      writer.start();
+31 −10
Original line number Diff line number Diff line
@@ -49,6 +49,7 @@ public class ElasticsearchWriter {
  private final Map<String, String> topicToIndexMap;
  private final long flushTimeoutMs;
  private final BulkProcessor<IndexableRecord, ?> bulkProcessor;
  private final boolean dropInvalidMessage;

  private final Set<String> existingMappings;

@@ -66,7 +67,8 @@ public class ElasticsearchWriter {
      int batchSize,
      long lingerMs,
      int maxRetries,
      long retryBackoffMs
      long retryBackoffMs,
      boolean dropInvalidMessage
  ) {
    this.client = client;
    this.type = type;
@@ -76,6 +78,7 @@ public class ElasticsearchWriter {
    this.ignoreSchemaTopics = ignoreSchemaTopics;
    this.topicToIndexMap = topicToIndexMap;
    this.flushTimeoutMs = flushTimeoutMs;
    this.dropInvalidMessage = dropInvalidMessage;

    bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
@@ -106,6 +109,7 @@ public class ElasticsearchWriter {
    private long lingerMs;
    private int maxRetry;
    private long retryBackoffMs;
    private boolean dropInvalidMessage;

    public Builder(JestClient client) {
      this.client = client;
@@ -168,6 +172,11 @@ public class ElasticsearchWriter {
      return this;
    }

    public Builder setDropInvalidMessage(boolean dropInvalidMessage) {
      this.dropInvalidMessage = dropInvalidMessage;
      return this;
    }

    public ElasticsearchWriter build() {
      return new ElasticsearchWriter(
          client,
@@ -183,7 +192,8 @@ public class ElasticsearchWriter {
          batchSize,
          lingerMs,
          maxRetry,
          retryBackoffMs
          retryBackoffMs,
          dropInvalidMessage
      );
    }
  }
@@ -209,16 +219,27 @@ public class ElasticsearchWriter {
        existingMappings.add(index);
      }

      final IndexableRecord indexableRecord = DataConverter.convertRecord(
          sinkRecord,
          index,
          type,
          ignoreKey,
          ignoreSchema
      );
      IndexableRecord indexableRecord = null;

      try {
        indexableRecord = DataConverter.convertRecord(sinkRecord, index, type, ignoreKey, ignoreSchema);
      } catch (ConnectException convertException) {
        if (dropInvalidMessage) {
          log.error("Can't convert record from topic {} with partition {} and offset {}. Error message: {}",
                  sinkRecord.topic(),
                  sinkRecord.kafkaPartition(),
                  sinkRecord.kafkaOffset(),
                  convertException.getMessage());
        } else {
          throw convertException;
        }
      }

      if (indexableRecord != null) {
        bulkProcessor.add(indexableRecord, flushTimeoutMs);
      }

    }
  }

  public void flush() {
+59 −3
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@ import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Rule;
import org.junit.Test;

import java.math.BigDecimal;
@@ -40,6 +41,7 @@ import java.util.Map;
import java.util.Set;

import io.searchbox.client.JestClient;
import org.junit.rules.ExpectedException;

@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
@@ -50,6 +52,9 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
  private final Schema otherSchema = createOtherSchema();
  private final Struct otherRecord = createOtherRecord(otherSchema);

  @Rule
  public ExpectedException thrown = ExpectedException.none();

  @Test
  public void testWriter() throws Exception {
    final boolean ignoreKey = false;
@@ -93,7 +98,7 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
    final String indexOverride = "index";

    Collection<SinkRecord> records = prepareData(2);
    ElasticsearchWriter writer = initWriter(client, ignoreKey, Collections.<String>emptySet(), ignoreSchema, Collections.<String>emptySet(), Collections.singletonMap(TOPIC, indexOverride));
    ElasticsearchWriter writer = initWriter(client, ignoreKey, Collections.<String>emptySet(), ignoreSchema, Collections.<String>emptySet(), Collections.singletonMap(TOPIC, indexOverride), false);
    writeDataAndRefresh(writer, records);
    verifySearchResults(records, indexOverride, ignoreKey, ignoreSchema);
  }
@@ -298,6 +303,52 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
    verifySearchResults(records, ignoreKey, ignoreSchema);
  }

  @Test
  public void testInvalidRecordException() throws Exception {
    final boolean ignoreKey = false;
    final boolean ignoreSchema = true;

    Collection<SinkRecord> records = new ArrayList<>();

    SinkRecord sinkRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, null, null, new byte[]{42}, 0);
    records.add(sinkRecord);

    final ElasticsearchWriter strictWriter = initWriter(client, ignoreKey, ignoreSchema, false);

    thrown.expect(ConnectException.class);
    thrown.expectMessage("Key is used as document id and can not be null");
    strictWriter.write(records);
  }

  @Test
  public void testDropInvalidRecord() throws Exception {
    final boolean ignoreKey = false;
    final boolean ignoreSchema = true;
    Collection<SinkRecord> inputRecords = new ArrayList<>();
    Collection<SinkRecord> outputRecords = new ArrayList<>();

    Schema structSchema = SchemaBuilder.struct().name("struct")
            .field("bytes", SchemaBuilder.BYTES_SCHEMA)
            .build();

    Struct struct = new Struct(structSchema);
    struct.put("bytes", new byte[]{42});


    SinkRecord invalidRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, null, structSchema, struct,  0);
    SinkRecord validRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key,  structSchema, struct, 1);

    inputRecords.add(validRecord);
    inputRecords.add(invalidRecord);

    outputRecords.add(validRecord);

    final ElasticsearchWriter nonStrictWriter = initWriter(client, ignoreKey, ignoreSchema, true);

    writeDataAndRefresh(nonStrictWriter, inputRecords);
    verifySearchResults(outputRecords, ignoreKey, ignoreSchema);
  }

  private Collection<SinkRecord> prepareData(int numRecords) {
    Collection<SinkRecord> records = new ArrayList<>();
    for (int i = 0; i < numRecords; ++i) {
@@ -308,10 +359,14 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
  }

  private ElasticsearchWriter initWriter(JestClient client, boolean ignoreKey, boolean ignoreSchema) {
    return initWriter(client, ignoreKey, Collections.<String>emptySet(), ignoreSchema, Collections.<String>emptySet(), Collections.<String, String>emptyMap());
    return initWriter(client, ignoreKey, Collections.<String>emptySet(), ignoreSchema, Collections.<String>emptySet(), Collections.<String, String>emptyMap(), false);
  }

  private ElasticsearchWriter initWriter(JestClient client, boolean ignoreKey, boolean ignoreSchema, boolean dropInvalidMessage) {
    return initWriter(client, ignoreKey, Collections.<String>emptySet(), ignoreSchema, Collections.<String>emptySet(), Collections.<String, String>emptyMap(), dropInvalidMessage);
  }

  private ElasticsearchWriter initWriter(JestClient client, boolean ignoreKey, Set<String> ignoreKeyTopics, boolean ignoreSchema, Set<String> ignoreSchemaTopics, Map<String, String> topicToIndexMap) {
  private ElasticsearchWriter initWriter(JestClient client, boolean ignoreKey, Set<String> ignoreKeyTopics, boolean ignoreSchema, Set<String> ignoreSchemaTopics, Map<String, String> topicToIndexMap, boolean dropInvalidMessage) {
    ElasticsearchWriter writer = new ElasticsearchWriter.Builder(client)
        .setType(TYPE)
        .setIgnoreKey(ignoreKey, ignoreKeyTopics)
@@ -324,6 +379,7 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
        .setLingerMs(1000)
        .setRetryBackoffMs(1000)
        .setMaxRetry(3)
        .setDropInvalidMessage(dropInvalidMessage)
        .build();
    writer.start();
    writer.createIndicesForTopics(Collections.singleton(TOPIC));