Loading src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +2 −1 Original line number Diff line number Diff line Loading @@ -95,7 +95,8 @@ public class DataConverter { } final String payload = new String(JSON_CONVERTER.fromConnectData(record.topic(), schema, value), StandardCharsets.UTF_8); return new IndexableRecord(new Key(index, type, id), payload, record.kafkaOffset()); final Long version = ignoreKey ? null : record.kafkaOffset(); return new IndexableRecord(new Key(index, type, id), payload, version); } // We need to pre process the Kafka Connect schema before converting to JSON as Elasticsearch Loading src/main/java/io/confluent/connect/elasticsearch/IndexableRecord.java +8 −7 Original line number Diff line number Diff line Loading @@ -22,22 +22,23 @@ public class IndexableRecord { public final Key key; public final String payload; public final long version; public final Long version; public IndexableRecord(Key key, String payload, long version) { public IndexableRecord(Key key, String payload, Long version) { this.key = key; this.version = version; this.payload = payload; } public Index toIndexRequest() { return new Index.Builder(payload) Index.Builder req = new Index.Builder(payload) .index(key.index) .type(key.type) .id(key.id) .setParameter("version_type", "external") .setParameter("version", version) .build(); .id(key.id); if (version != null) { req.setParameter("version_type", "external").setParameter("version", version); } return req.build(); } } src/test/java/io/confluent/connect/elasticsearch/ElasticsearchWriterTest.java +30 −1 Original line number Diff line number Diff line Loading @@ -34,6 +34,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; Loading Loading @@ -152,7 +153,7 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase { } @Test public void testSafeRedelivery() throws Exception { public void testSafeRedeliveryRegularKey() throws Exception { final boolean ignoreKey = false; final boolean ignoreSchema = false; Loading @@ -177,6 +178,34 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase { verifySearchResults(Collections.singleton(sinkRecord1), ignoreKey, ignoreSchema); } @Test public void testSafeRedeliveryOffsetInKey() throws Exception { final boolean ignoreKey = true; final boolean ignoreSchema = false; final Struct value0 = new Struct(schema); value0.put("user", "foo"); value0.put("message", "hi"); final SinkRecord sinkRecord0 = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, value0, 0); final Struct value1 = new Struct(schema); value1.put("user", "foo"); value1.put("message", "bye"); final SinkRecord sinkRecord1 = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, value1, 1); final List<SinkRecord> records = Arrays.asList(sinkRecord0, sinkRecord1); final ElasticsearchWriter writer = initWriter(client, ignoreKey, ignoreSchema); writer.write(records); writer.flush(); // write them again writeDataAndRefresh(writer, records); // last write should have been ignored due to version conflict verifySearchResults(records, ignoreKey, ignoreSchema); } @Test public void testMap() throws Exception { final boolean ignoreKey = false; Loading Loading
src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +2 −1 Original line number Diff line number Diff line Loading @@ -95,7 +95,8 @@ public class DataConverter { } final String payload = new String(JSON_CONVERTER.fromConnectData(record.topic(), schema, value), StandardCharsets.UTF_8); return new IndexableRecord(new Key(index, type, id), payload, record.kafkaOffset()); final Long version = ignoreKey ? null : record.kafkaOffset(); return new IndexableRecord(new Key(index, type, id), payload, version); } // We need to pre process the Kafka Connect schema before converting to JSON as Elasticsearch Loading
src/main/java/io/confluent/connect/elasticsearch/IndexableRecord.java +8 −7 Original line number Diff line number Diff line Loading @@ -22,22 +22,23 @@ public class IndexableRecord { public final Key key; public final String payload; public final long version; public final Long version; public IndexableRecord(Key key, String payload, long version) { public IndexableRecord(Key key, String payload, Long version) { this.key = key; this.version = version; this.payload = payload; } public Index toIndexRequest() { return new Index.Builder(payload) Index.Builder req = new Index.Builder(payload) .index(key.index) .type(key.type) .id(key.id) .setParameter("version_type", "external") .setParameter("version", version) .build(); .id(key.id); if (version != null) { req.setParameter("version_type", "external").setParameter("version", version); } return req.build(); } }
src/test/java/io/confluent/connect/elasticsearch/ElasticsearchWriterTest.java +30 −1 Original line number Diff line number Diff line Loading @@ -34,6 +34,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; Loading Loading @@ -152,7 +153,7 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase { } @Test public void testSafeRedelivery() throws Exception { public void testSafeRedeliveryRegularKey() throws Exception { final boolean ignoreKey = false; final boolean ignoreSchema = false; Loading @@ -177,6 +178,34 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase { verifySearchResults(Collections.singleton(sinkRecord1), ignoreKey, ignoreSchema); } @Test public void testSafeRedeliveryOffsetInKey() throws Exception { final boolean ignoreKey = true; final boolean ignoreSchema = false; final Struct value0 = new Struct(schema); value0.put("user", "foo"); value0.put("message", "hi"); final SinkRecord sinkRecord0 = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, value0, 0); final Struct value1 = new Struct(schema); value1.put("user", "foo"); value1.put("message", "bye"); final SinkRecord sinkRecord1 = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, value1, 1); final List<SinkRecord> records = Arrays.asList(sinkRecord0, sinkRecord1); final ElasticsearchWriter writer = initWriter(client, ignoreKey, ignoreSchema); writer.write(records); writer.flush(); // write them again writeDataAndRefresh(writer, records); // last write should have been ignored due to version conflict verifySearchResults(records, ignoreKey, ignoreSchema); } @Test public void testMap() throws Exception { final boolean ignoreKey = false; Loading