Loading src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java +7 −2 Original line number Diff line number Diff line Loading @@ -125,20 +125,25 @@ public class ElasticsearchSinkTask extends SinkTask { @Override public void put(Collection<SinkRecord> records) throws ConnectException { log.trace("Putting {} to Elasticsearch.", records); if (writer != null) { writer.write(records); } } @Override public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) { log.trace("Flushing data to Elasticsearch with the following offsets: {}", offsets); if (writer != null) { writer.flush(); } } @Override public void close(Collection<TopicPartition> partitions) { log.debug("Closing the task for topic partitions: {}", partitions); if (writer != null) { writer.close(); writer = null; } } Loading src/test/java/io/confluent/connect/elasticsearch/BulkProcessorTest.java +1 −1 Original line number Diff line number Diff line Loading @@ -43,7 +43,7 @@ public class BulkProcessorTest { private final String type = "connect"; private final String topic = "topic"; private final int partition = 0; private final long flushTimeoutMs = 20000; private final long flushTimeoutMs = 30000; private final long lingerMs = 2000; private final int maxRetry = 5; private final long retryBackoffMs = 3000; Loading Loading
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java +7 −2 Original line number Diff line number Diff line Loading @@ -125,20 +125,25 @@ public class ElasticsearchSinkTask extends SinkTask { @Override public void put(Collection<SinkRecord> records) throws ConnectException { log.trace("Putting {} to Elasticsearch.", records); if (writer != null) { writer.write(records); } } @Override public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) { log.trace("Flushing data to Elasticsearch with the following offsets: {}", offsets); if (writer != null) { writer.flush(); } } @Override public void close(Collection<TopicPartition> partitions) { log.debug("Closing the task for topic partitions: {}", partitions); if (writer != null) { writer.close(); writer = null; } } Loading
src/test/java/io/confluent/connect/elasticsearch/BulkProcessorTest.java +1 −1 Original line number Diff line number Diff line Loading @@ -43,7 +43,7 @@ public class BulkProcessorTest { private final String type = "connect"; private final String topic = "topic"; private final int partition = 0; private final long flushTimeoutMs = 20000; private final long flushTimeoutMs = 30000; private final long lingerMs = 2000; private final int maxRetry = 5; private final long retryBackoffMs = 3000; Loading