Loading src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java +7 −2 Original line number Diff line number Diff line Loading @@ -125,20 +125,25 @@ public class ElasticsearchSinkTask extends SinkTask { @Override public void put(Collection<SinkRecord> records) throws ConnectException { log.trace("Putting {} to Elasticsearch.", records); if (writer != null) { writer.write(records); } } @Override public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) { log.trace("Flushing data to Elasticsearch with the following offsets: {}", offsets); if (writer != null) { writer.flush(); } } @Override public void close(Collection<TopicPartition> partitions) { log.debug("Closing the task for topic partitions: {}", partitions); if (writer != null) { writer.close(); writer = null; } } Loading Loading
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java +7 −2 Original line number Diff line number Diff line Loading @@ -125,20 +125,25 @@ public class ElasticsearchSinkTask extends SinkTask { @Override public void put(Collection<SinkRecord> records) throws ConnectException { log.trace("Putting {} to Elasticsearch.", records); if (writer != null) { writer.write(records); } } @Override public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) { log.trace("Flushing data to Elasticsearch with the following offsets: {}", offsets); if (writer != null) { writer.flush(); } } @Override public void close(Collection<TopicPartition> partitions) { log.debug("Closing the task for topic partitions: {}", partitions); if (writer != null) { writer.close(); writer = null; } } Loading