Commit 58ae0ddd authored by Ewen Cheslack-Postava's avatar Ewen Cheslack-Postava Committed by GitHub
Browse files

Merge pull request #91 from KL-WLCR/f/drop_invalid_record

Drop invalid messages if needed
parents f3068c9a 733f9e24
Loading
Loading
Loading
Loading
+17 −1
Original line number Diff line number Diff line
@@ -76,6 +76,8 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
  public static final String TOPIC_KEY_IGNORE_CONFIG = "topic.key.ignore";
  public static final String SCHEMA_IGNORE_CONFIG = "schema.ignore";
  public static final String TOPIC_SCHEMA_IGNORE_CONFIG = "topic.schema.ignore";
  public static final String DROP_INVALID_MESSAGE_CONFIG = "drop.invalid.message";

  private static final String KEY_IGNORE_DOC =
      "Whether to ignore the record key for the purpose of forming the Elasticsearch document ID."
      + " When this is set to ``true``, document IDs will be generated as the record's "
@@ -92,6 +94,9 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
      + TOPIC_SCHEMA_IGNORE_CONFIG + "`` to override as ``true`` for specific topics.";
  private static final String TOPIC_SCHEMA_IGNORE_DOC =
      "List of topics for which ``" + SCHEMA_IGNORE_CONFIG + "`` should be ``true``.";
  private static final String DROP_INVALID_MESSAGE_DOC =
          "Whether to drop kafka message when it cannot be converted to output message.";


  protected static ConfigDef baseConfigDef() {
    final ConfigDef configDef = new ConfigDef();
@@ -247,7 +252,18 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
        ++order,
        Width.LONG,
        "Topics for 'Ignore Schema' mode"
      );
    ).define(
        DROP_INVALID_MESSAGE_CONFIG,
        Type.BOOLEAN,
        false,
        Importance.LOW,
        DROP_INVALID_MESSAGE_DOC,
        group,
        ++order,
        Width.LONG,
        "Drop invalid messages");


  }

  public static final ConfigDef CONFIG = baseConfigDef();
+5 −1
Original line number Diff line number Diff line
@@ -64,6 +64,7 @@ public class ElasticsearchSinkTask extends SinkTask {
      boolean ignoreSchema =
          config.getBoolean(ElasticsearchSinkConnectorConfig.SCHEMA_IGNORE_CONFIG);


      Map<String, String> topicToIndexMap =
          parseMapConfig(config.getList(ElasticsearchSinkConnectorConfig.TOPIC_INDEX_MAP_CONFIG));
      Set<String> topicIgnoreKey =
@@ -86,6 +87,8 @@ public class ElasticsearchSinkTask extends SinkTask {
          config.getLong(ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG);
      int maxRetry =
          config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG);
      boolean dropInvalidMessage =
          config.getBoolean(ElasticsearchSinkConnectorConfig.DROP_INVALID_MESSAGE_CONFIG);

      if (client != null) {
        this.client = client;
@@ -112,7 +115,8 @@ public class ElasticsearchSinkTask extends SinkTask {
          .setBatchSize(batchSize)
          .setLingerMs(lingerMs)
          .setRetryBackoffMs(retryBackoffMs)
          .setMaxRetry(maxRetry);
          .setMaxRetry(maxRetry)
          .setDropInvalidMessage(dropInvalidMessage);

      writer = builder.build();
      writer.start();
+51 −10
Original line number Diff line number Diff line
@@ -49,6 +49,7 @@ public class ElasticsearchWriter {
  private final Map<String, String> topicToIndexMap;
  private final long flushTimeoutMs;
  private final BulkProcessor<IndexableRecord, ?> bulkProcessor;
  private final boolean dropInvalidMessage;

  private final Set<String> existingMappings;

@@ -66,7 +67,8 @@ public class ElasticsearchWriter {
      int batchSize,
      long lingerMs,
      int maxRetries,
      long retryBackoffMs
      long retryBackoffMs,
      boolean dropInvalidMessage
  ) {
    this.client = client;
    this.type = type;
@@ -76,6 +78,7 @@ public class ElasticsearchWriter {
    this.ignoreSchemaTopics = ignoreSchemaTopics;
    this.topicToIndexMap = topicToIndexMap;
    this.flushTimeoutMs = flushTimeoutMs;
    this.dropInvalidMessage = dropInvalidMessage;

    bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
@@ -106,6 +109,7 @@ public class ElasticsearchWriter {
    private long lingerMs;
    private int maxRetry;
    private long retryBackoffMs;
    private boolean dropInvalidMessage;

    public Builder(JestClient client) {
      this.client = client;
@@ -168,6 +172,11 @@ public class ElasticsearchWriter {
      return this;
    }

    public Builder setDropInvalidMessage(boolean dropInvalidMessage) {
      this.dropInvalidMessage = dropInvalidMessage;
      return this;
    }

    public ElasticsearchWriter build() {
      return new ElasticsearchWriter(
          client,
@@ -183,7 +192,8 @@ public class ElasticsearchWriter {
          batchSize,
          lingerMs,
          maxRetry,
          retryBackoffMs
          retryBackoffMs,
          dropInvalidMessage
      );
    }
  }
@@ -209,16 +219,47 @@ public class ElasticsearchWriter {
        existingMappings.add(index);
      }

      final IndexableRecord indexableRecord = DataConverter.convertRecord(
      final IndexableRecord indexableRecord = tryGetIndexableRecord(
              sinkRecord,
              index,
          type,
              ignoreKey,
          ignoreSchema
      );
              ignoreSchema);

      if (indexableRecord != null) {
        bulkProcessor.add(indexableRecord, flushTimeoutMs);
      }

    }
  }

  private IndexableRecord tryGetIndexableRecord(
          SinkRecord sinkRecord,
          String index,
          boolean ignoreKey,
          boolean ignoreSchema) {

    IndexableRecord indexableRecord = null;

    try {
      indexableRecord = DataConverter.convertRecord(
              sinkRecord,
              index,
              type,
              ignoreKey,
              ignoreSchema);
    } catch (ConnectException convertException) {
      if (dropInvalidMessage) {
        log.error("Can't convert record from topic {} with partition {} and offset {}."
                   + " Error message: {}",
                  sinkRecord.topic(),
                  sinkRecord.kafkaPartition(),
                  sinkRecord.kafkaOffset(),
                  convertException.getMessage());
      } else {
        throw convertException;
      }
    }
    return indexableRecord;
  }

  public void flush() {
+59 −3
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@ import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Rule;
import org.junit.Test;

import java.math.BigDecimal;
@@ -40,6 +41,7 @@ import java.util.Map;
import java.util.Set;

import io.searchbox.client.JestClient;
import org.junit.rules.ExpectedException;

@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
@@ -50,6 +52,9 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
  private final Schema otherSchema = createOtherSchema();
  private final Struct otherRecord = createOtherRecord(otherSchema);

  @Rule
  public ExpectedException thrown = ExpectedException.none();

  @Test
  public void testWriter() throws Exception {
    final boolean ignoreKey = false;
@@ -93,7 +98,7 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
    final String indexOverride = "index";

    Collection<SinkRecord> records = prepareData(2);
    ElasticsearchWriter writer = initWriter(client, ignoreKey, Collections.<String>emptySet(), ignoreSchema, Collections.<String>emptySet(), Collections.singletonMap(TOPIC, indexOverride));
    ElasticsearchWriter writer = initWriter(client, ignoreKey, Collections.<String>emptySet(), ignoreSchema, Collections.<String>emptySet(), Collections.singletonMap(TOPIC, indexOverride), false);
    writeDataAndRefresh(writer, records);
    verifySearchResults(records, indexOverride, ignoreKey, ignoreSchema);
  }
@@ -298,6 +303,52 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
    verifySearchResults(records, ignoreKey, ignoreSchema);
  }

  @Test
  public void testInvalidRecordException() throws Exception {
    final boolean ignoreKey = false;
    final boolean ignoreSchema = true;

    Collection<SinkRecord> records = new ArrayList<>();

    SinkRecord sinkRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, null, null, new byte[]{42}, 0);
    records.add(sinkRecord);

    final ElasticsearchWriter strictWriter = initWriter(client, ignoreKey, ignoreSchema, false);

    thrown.expect(ConnectException.class);
    thrown.expectMessage("Key is used as document id and can not be null");
    strictWriter.write(records);
  }

  @Test
  public void testDropInvalidRecord() throws Exception {
    final boolean ignoreKey = false;
    final boolean ignoreSchema = true;
    Collection<SinkRecord> inputRecords = new ArrayList<>();
    Collection<SinkRecord> outputRecords = new ArrayList<>();

    Schema structSchema = SchemaBuilder.struct().name("struct")
            .field("bytes", SchemaBuilder.BYTES_SCHEMA)
            .build();

    Struct struct = new Struct(structSchema);
    struct.put("bytes", new byte[]{42});


    SinkRecord invalidRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, null, structSchema, struct,  0);
    SinkRecord validRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key,  structSchema, struct, 1);

    inputRecords.add(validRecord);
    inputRecords.add(invalidRecord);

    outputRecords.add(validRecord);

    final ElasticsearchWriter nonStrictWriter = initWriter(client, ignoreKey, ignoreSchema, true);

    writeDataAndRefresh(nonStrictWriter, inputRecords);
    verifySearchResults(outputRecords, ignoreKey, ignoreSchema);
  }

  private Collection<SinkRecord> prepareData(int numRecords) {
    Collection<SinkRecord> records = new ArrayList<>();
    for (int i = 0; i < numRecords; ++i) {
@@ -308,10 +359,14 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
  }

  private ElasticsearchWriter initWriter(JestClient client, boolean ignoreKey, boolean ignoreSchema) {
    return initWriter(client, ignoreKey, Collections.<String>emptySet(), ignoreSchema, Collections.<String>emptySet(), Collections.<String, String>emptyMap());
    return initWriter(client, ignoreKey, Collections.<String>emptySet(), ignoreSchema, Collections.<String>emptySet(), Collections.<String, String>emptyMap(), false);
  }

  private ElasticsearchWriter initWriter(JestClient client, boolean ignoreKey, boolean ignoreSchema, boolean dropInvalidMessage) {
    return initWriter(client, ignoreKey, Collections.<String>emptySet(), ignoreSchema, Collections.<String>emptySet(), Collections.<String, String>emptyMap(), dropInvalidMessage);
  }

  private ElasticsearchWriter initWriter(JestClient client, boolean ignoreKey, Set<String> ignoreKeyTopics, boolean ignoreSchema, Set<String> ignoreSchemaTopics, Map<String, String> topicToIndexMap) {
  private ElasticsearchWriter initWriter(JestClient client, boolean ignoreKey, Set<String> ignoreKeyTopics, boolean ignoreSchema, Set<String> ignoreSchemaTopics, Map<String, String> topicToIndexMap, boolean dropInvalidMessage) {
    ElasticsearchWriter writer = new ElasticsearchWriter.Builder(client)
        .setType(TYPE)
        .setIgnoreKey(ignoreKey, ignoreKeyTopics)
@@ -324,6 +379,7 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
        .setLingerMs(1000)
        .setRetryBackoffMs(1000)
        .setMaxRetry(3)
        .setDropInvalidMessage(dropInvalidMessage)
        .build();
    writer.start();
    writer.createIndicesForTopics(Collections.singleton(TOPIC));