Loading src/main/java/io/confluent/connect/elasticsearch/ElasticsearchWriter.java +7 −0 Original line number Diff line number Diff line Loading @@ -53,6 +53,7 @@ public class ElasticsearchWriter { private final long flushTimeoutMs; private final BulkProcessor<IndexableRecord, ?> bulkProcessor; private final DataConverter converter; private final BehaviorOnNullValues behaviorOnNullValues; private final Set<String> existingMappings; Loading Loading @@ -82,6 +83,7 @@ public class ElasticsearchWriter { this.ignoreSchemaTopics = ignoreSchemaTopics; this.topicToIndexMap = topicToIndexMap; this.flushTimeoutMs = flushTimeoutMs; this.behaviorOnNullValues = behaviorOnNullValues; this.converter = new DataConverter(useCompactMapEntries, behaviorOnNullValues); bulkProcessor = new BulkProcessor<>( Loading Loading @@ -218,6 +220,11 @@ public class ElasticsearchWriter { public void write(Collection<SinkRecord> records) { for (SinkRecord sinkRecord : records) { // Preemptively skip records with null values if they're going to be ignored anyways if (sinkRecord.value() == null && behaviorOnNullValues.equals(BehaviorOnNullValues.IGNORE)) { continue; } final String indexOverride = topicToIndexMap.get(sinkRecord.topic()); final String index = indexOverride != null ? indexOverride : sinkRecord.topic(); final boolean ignoreKey = ignoreKeyTopics.contains(sinkRecord.topic()) || this.ignoreKey; Loading Loading
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchWriter.java +7 −0 Original line number Diff line number Diff line Loading @@ -53,6 +53,7 @@ public class ElasticsearchWriter { private final long flushTimeoutMs; private final BulkProcessor<IndexableRecord, ?> bulkProcessor; private final DataConverter converter; private final BehaviorOnNullValues behaviorOnNullValues; private final Set<String> existingMappings; Loading Loading @@ -82,6 +83,7 @@ public class ElasticsearchWriter { this.ignoreSchemaTopics = ignoreSchemaTopics; this.topicToIndexMap = topicToIndexMap; this.flushTimeoutMs = flushTimeoutMs; this.behaviorOnNullValues = behaviorOnNullValues; this.converter = new DataConverter(useCompactMapEntries, behaviorOnNullValues); bulkProcessor = new BulkProcessor<>( Loading Loading @@ -218,6 +220,11 @@ public class ElasticsearchWriter { public void write(Collection<SinkRecord> records) { for (SinkRecord sinkRecord : records) { // Preemptively skip records with null values if they're going to be ignored anyways if (sinkRecord.value() == null && behaviorOnNullValues.equals(BehaviorOnNullValues.IGNORE)) { continue; } final String indexOverride = topicToIndexMap.get(sinkRecord.topic()); final String index = indexOverride != null ? indexOverride : sinkRecord.topic(); final boolean ignoreKey = ignoreKeyTopics.contains(sinkRecord.topic()) || this.ignoreKey; Loading