Loading src/main/java/io/confluent/connect/elasticsearch/ElasticsearchWriter.java +34 −14 Original line number Diff line number Diff line Loading @@ -219,13 +219,38 @@ public class ElasticsearchWriter { existingMappings.add(index); } final IndexableRecord indexableRecord = tryGetIndexableRecord( sinkRecord, index, ignoreKey, ignoreSchema); if (indexableRecord != null) { bulkProcessor.add(indexableRecord, flushTimeoutMs); } } } private IndexableRecord tryGetIndexableRecord( SinkRecord sinkRecord, String index, boolean ignoreKey, boolean ignoreSchema) { IndexableRecord indexableRecord = null; try { indexableRecord = DataConverter.convertRecord(sinkRecord, index, type, ignoreKey, ignoreSchema); indexableRecord = DataConverter.convertRecord( sinkRecord, index, type, ignoreKey, ignoreSchema); } catch (ConnectException convertException) { if (dropInvalidMessage) { log.error("Can't convert record from topic {} with partition {} and offset {}. Error message: {}", log.error("Can't convert record from topic {} with partition {} and offset {}." + " Error message: {}", sinkRecord.topic(), sinkRecord.kafkaPartition(), sinkRecord.kafkaOffset(), Loading @@ -234,12 +259,7 @@ public class ElasticsearchWriter { throw convertException; } } if (indexableRecord != null) { bulkProcessor.add(indexableRecord, flushTimeoutMs); } } return indexableRecord; } public void flush() { Loading Loading
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchWriter.java +34 −14 Original line number Diff line number Diff line Loading @@ -219,13 +219,38 @@ public class ElasticsearchWriter { existingMappings.add(index); } final IndexableRecord indexableRecord = tryGetIndexableRecord( sinkRecord, index, ignoreKey, ignoreSchema); if (indexableRecord != null) { bulkProcessor.add(indexableRecord, flushTimeoutMs); } } } private IndexableRecord tryGetIndexableRecord( SinkRecord sinkRecord, String index, boolean ignoreKey, boolean ignoreSchema) { IndexableRecord indexableRecord = null; try { indexableRecord = DataConverter.convertRecord(sinkRecord, index, type, ignoreKey, ignoreSchema); indexableRecord = DataConverter.convertRecord( sinkRecord, index, type, ignoreKey, ignoreSchema); } catch (ConnectException convertException) { if (dropInvalidMessage) { log.error("Can't convert record from topic {} with partition {} and offset {}. Error message: {}", log.error("Can't convert record from topic {} with partition {} and offset {}." + " Error message: {}", sinkRecord.topic(), sinkRecord.kafkaPartition(), sinkRecord.kafkaOffset(), Loading @@ -234,12 +259,7 @@ public class ElasticsearchWriter { throw convertException; } } if (indexableRecord != null) { bulkProcessor.add(indexableRecord, flushTimeoutMs); } } return indexableRecord; } public void flush() { Loading