Commit a053c175 authored by Shikhar Bhushan's avatar Shikhar Bhushan
Browse files

Merge branch '3.1.x'

parents dae4acbd e04cc96c
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -95,7 +95,8 @@ public class DataConverter {
    }

    final String payload = new String(JSON_CONVERTER.fromConnectData(record.topic(), schema, value), StandardCharsets.UTF_8);
    return new IndexableRecord(new Key(index, type, id), payload, record.kafkaOffset());
    final Long version = ignoreKey ? null : record.kafkaOffset();
    return new IndexableRecord(new Key(index, type, id), payload, version);
  }

  // We need to pre process the Kafka Connect schema before converting to JSON as Elasticsearch
+8 −7
Original line number Diff line number Diff line
@@ -22,22 +22,23 @@ public class IndexableRecord {

  public final Key key;
  public final String payload;
  public final long version;
  public final Long version;

  public IndexableRecord(Key key, String payload, long version) {
  public IndexableRecord(Key key, String payload, Long version) {
    this.key = key;
    this.version = version;
    this.payload = payload;
  }

  public Index toIndexRequest() {
    return new Index.Builder(payload)
    Index.Builder req = new Index.Builder(payload)
        .index(key.index)
        .type(key.type)
        .id(key.id)
        .setParameter("version_type", "external")
        .setParameter("version", version)
        .build();
        .id(key.id);
    if (version != null) {
      req.setParameter("version_type", "external").setParameter("version", version);
    }
    return req.build();
  }

}
+30 −1
Original line number Diff line number Diff line
@@ -34,6 +34,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

@@ -152,7 +153,7 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
  }

  @Test
  public void testSafeRedelivery() throws Exception {
  public void testSafeRedeliveryRegularKey() throws Exception {
    final boolean ignoreKey = false;
    final boolean ignoreSchema = false;

@@ -177,6 +178,34 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
    verifySearchResults(Collections.singleton(sinkRecord1), ignoreKey, ignoreSchema);
  }

  @Test
  public void testSafeRedeliveryOffsetInKey() throws Exception {
    final boolean ignoreKey = true;
    final boolean ignoreSchema = false;

    final Struct value0 = new Struct(schema);
    value0.put("user", "foo");
    value0.put("message", "hi");
    final SinkRecord sinkRecord0 = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, value0, 0);

    final Struct value1 = new Struct(schema);
    value1.put("user", "foo");
    value1.put("message", "bye");
    final SinkRecord sinkRecord1 = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, value1, 1);

    final List<SinkRecord> records = Arrays.asList(sinkRecord0, sinkRecord1);

    final ElasticsearchWriter writer = initWriter(client, ignoreKey, ignoreSchema);
    writer.write(records);
    writer.flush();

    // write them again
    writeDataAndRefresh(writer, records);

    // last write should have been ignored due to version conflict
    verifySearchResults(records, ignoreKey, ignoreSchema);
  }

  @Test
  public void testMap() throws Exception {
    final boolean ignoreKey = false;