Commit 7a024455 authored by Shikhar Bhushan's avatar Shikhar Bhushan Committed by GitHub
Browse files

Merge pull request #26 from confluentinc/bulkprocessor-refactor

BulkProcessor refactor
parents ab5f0159 12300ac6
Loading
Loading
Loading
Loading
+96 −0
Original line number Diff line number Diff line
@@ -13,83 +13,84 @@
 * License for the specific language governing permissions and limitations under
 * the License.
 **/
package io.confluent.connect.elasticsearch;

package io.confluent.connect.elasticsearch.internals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;

import io.confluent.connect.elasticsearch.bulk.BulkClient;
import io.confluent.connect.elasticsearch.bulk.BulkResponse;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResultHandler;
import io.searchbox.core.Bulk;
import io.searchbox.core.BulkResult;
import io.searchbox.core.Index;

public class HttpClient implements Client<Response> {
public class BulkIndexingClient implements BulkClient<IndexableRecord, Bulk> {

  private static final Logger log = LoggerFactory.getLogger(HttpClient.class);
  private ObjectMapper objectMapper = new ObjectMapper();
  private final JestClient jestClient;
  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
  private static final Set<String> NON_RETRIABLE_ERROR_TYPES = Collections.singleton("mapper_parse_exception");

  public HttpClient(JestClient jestClient) {
    this.jestClient = jestClient;
  }
  private final JestClient client;

  @Override
  public void execute(RecordBatch batch, Callback<Response> callback) {
    Bulk bulk = constructBulk(batch, callback);
    jestClient.executeAsync(bulk, new CallbackHandler(callback));
  public BulkIndexingClient(JestClient client) {
    this.client = client;
  }

  @Override
  public void close() {
    // We shutdown the JEST client when sink tasks are stopped.
  }

  private Bulk constructBulk(RecordBatch batch, Callback<Response> callback) {
    Bulk.Builder builder = new Bulk.Builder();
    List<ESRequest> requests = batch.requests();
    for (ESRequest request: requests) {
      JsonNode data = null;
      try {
        data = objectMapper.readTree(request.getPayload());
      } catch (IOException e) {
        callback.onFailure(e);
      }
      Index index = new Index.Builder(data.toString())
  public Bulk bulkRequest(List<IndexableRecord> batch) {
    final Bulk.Builder builder = new Bulk.Builder();
    for (IndexableRecord request : batch) {
      builder.addAction(
          new Index.Builder(request.getPayload())
              .index(request.getIndex())
              .type(request.getType())
              .id(request.getId())
          .build();
      builder.addAction(index);
              .build()
      );
    }
    return builder.build();
  }

  private static class CallbackHandler implements JestResultHandler<BulkResult> {
  @Override
  public BulkResponse execute(Bulk bulk) throws IOException {
    return toBulkResponse(client.execute(bulk));
  }

    private Callback<Response> callback;
  private static BulkResponse toBulkResponse(BulkResult result) {
    if (result.isSucceeded()) {
      return BulkResponse.success();
    }

    public CallbackHandler(Callback<Response> callback) {
      this.callback = callback;
    final List<BulkResult.BulkResultItem> failedItems = result.getFailedItems();
    if (failedItems.isEmpty()) {
      return BulkResponse.failure(true, result.getErrorMessage());
    }

    @Override
    public void completed(BulkResult result) {
      log.debug("Request completed with result: {}", result);
      callback.onResponse(new Response(result));
    boolean retriable = true;
    final List<String> errors = new ArrayList<>(failedItems.size());
    for (BulkResult.BulkResultItem failedItem : failedItems) {
      errors.add(failedItem.error);
      retriable &= isRetriableError(failedItem.error);
    }
    return BulkResponse.failure(retriable, errors.toString());
  }

    @Override
    public void failed(Exception e) {
      log.debug("Request failed with exception: {}", e.getMessage());
      callback.onFailure(e);
  private static boolean isRetriableError(String error) {
    if (error != null && !error.trim().isEmpty()) {
      try {
        final ObjectNode parsedError = (ObjectNode) OBJECT_MAPPER.readTree(error);
        return !NON_RETRIABLE_ERROR_TYPES.contains(parsedError.get("type").asText());
      } catch (IOException e) {
        return true;
      }
    }
    return true;
  }

}
+8 −27
Original line number Diff line number Diff line
@@ -33,13 +33,13 @@ import org.apache.kafka.connect.storage.Converter;

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;

import io.confluent.connect.elasticsearch.internals.ESRequest;
import io.searchbox.client.JestClient;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY;
@@ -53,14 +53,6 @@ public class DataConverter {
    JSON_CONVERTER.configure(Collections.singletonMap("schemas.enable", "false"), false);
  }

  /**
   * Convert the key to the string representation.
   *
   * @param key The key of a SinkRecord.
   * @param keySchema The key schema.
   * @return The string representation of the key.
   * @throws ConnectException if the key is null.
   */
  public static String convertKey(Object key, Schema keySchema) {
    if (key == null) {
      throw new ConnectException("Key is used as document id and can not be null.");
@@ -86,26 +78,14 @@ public class DataConverter {
    }
  }

  /**
   * Convert a SinkRecord to an IndexRequest.
   *
   * @param record The SinkRecord to be converted.
   * @param client The client to connect to Elasticsearch.
   * @param ignoreKey Whether to ignore the key during indexing.
   * @param ignoreSchema Whether to ignore the schema during indexing.
   * @param topicConfigs The map of per topic configs.
   * @param mappings The mapping cache.
   * @return The converted IndexRequest.
   */

  public static ESRequest convertRecord(
  public static IndexableRecord convertRecord(
      SinkRecord record,
      String type,
      JestClient client,
      boolean ignoreKey,
      boolean ignoreSchema,
      Map<String, TopicConfig> topicConfigs,
      Set<String> mappings) {
      Set<String> mappingCache) {

    String topic = record.topic();
    int partition = record.kafkaPartition();
@@ -139,9 +119,9 @@ public class DataConverter {
    }

    try {
      if (!topicIgnoreSchema && !mappings.contains(index) && !Mapping.doesMappingExist(client, index, type, mappings)) {
      if (!topicIgnoreSchema && !mappingCache.contains(index) && !Mapping.doesMappingExist(client, index, type, mappingCache)) {
        Mapping.createMapping(client, index, type, valueSchema);
        mappings.add(index);
        mappingCache.add(index);
      }
    } catch (IOException e) {
      // TODO: It is possible that two clients are creating the mapping at the same time and
@@ -160,8 +140,9 @@ public class DataConverter {
      newValue = value;
    }

    byte[] json = JSON_CONVERTER.fromConnectData(topic, newSchema, newValue);
    return new ESRequest(index, type, id, json);
    String payload = new String(JSON_CONVERTER.fromConnectData(topic, newSchema, newValue), StandardCharsets.UTF_8);

    return new IndexableRecord(index, type, id, payload);
  }

  // We need to pre process the Kafka Connect schema before converting to JSON as Elasticsearch
+3 −4
Original line number Diff line number Diff line
@@ -65,9 +65,8 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {

  public static final String MAX_BUFFERED_RECORDS_CONFIG = "max.buffered.records";
  private static final String MAX_BUFFERED_RECORDS_DOC =
      "Approximately the max number of records each task will buffer. This config controls the memory usage for each task. When the number of "
      + "buffered records is larger than this value, the partitions assigned to this task will be paused.";
  private static final long MAX_BUFFERED_RECORDS_DEFAULT = 20000;
      "Approximately the max number of records each task will buffer. This config controls the memory usage for each task.";
  private static final int MAX_BUFFERED_RECORDS_DEFAULT = 20000;
  private static final String MAX_BUFFERED_RECORDS_DISPLAY = "Max Number of Records to Buffer";

  public static final String BATCH_SIZE_CONFIG = "batch.size";
@@ -135,7 +134,7 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
        .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, RETRY_BACKOFF_MS_DEFAULT, Importance.LOW, RETRY_BACKOFF_MS_DOC, CONNECTOR_GROUP, 11, Width.SHORT, RETRY_BACKOFF_MS_DISPLAY)
        .define(MAX_RETRIES_CONFIG, Type.INT, MAX_RETRIES_DEFAULT, Importance.LOW, MAX_RETRIES_DOC, CONNECTOR_GROUP, 12, Width.SHORT, MAX_RETRIES_DISPLAY)
        .define(FLUSH_TIMEOUT_MS_CONFIG, Type.LONG, FLUSH_TIMEOUT_MS_DEFAULT, Importance.LOW, FLUSH_TIMEOUT_MS_DOC, CONNECTOR_GROUP, 13, Width.SHORT, FLUSH_TIMEOUT_MS_DISPLAY)
        .define(MAX_BUFFERED_RECORDS_CONFIG, Type.LONG, MAX_BUFFERED_RECORDS_DEFAULT, Importance.LOW, MAX_BUFFERED_RECORDS_DOC, CONNECTOR_GROUP, 14, Width.SHORT, MAX_BUFFERED_RECORDS_DISPLAY);
        .define(MAX_BUFFERED_RECORDS_CONFIG, Type.INT, MAX_BUFFERED_RECORDS_DEFAULT, Importance.LOW, MAX_BUFFERED_RECORDS_DOC, CONNECTOR_GROUP, 14, Width.SHORT, MAX_BUFFERED_RECORDS_DISPLAY);
  }

  static ConfigDef config = baseConfigDef();
+15 −16
Original line number Diff line number Diff line
@@ -41,7 +41,6 @@ public class ElasticsearchSinkTask extends SinkTask {
  private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class);
  private ElasticsearchWriter writer;
  private JestClient client;
  private ElasticsearchWriter.Builder builder;

  @Override
  public String version() {
@@ -69,7 +68,7 @@ public class ElasticsearchSinkTask extends SinkTask {
      Map<String, TopicConfig> topicConfigs = constructTopicConfig(topicIndex, topicIgnoreKey, topicIgnoreSchema);

      long flushTimeoutMs = config.getLong(ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG);
      long maxBufferedRecords = config.getLong(ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG);
      int maxBufferedRecords = config.getInt(ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG);
      int batchSize = config.getInt(ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG);
      long lingerMs = config.getLong(ElasticsearchSinkConnectorConfig.LINGER_MS_CONFIG);
      int maxInFlightRequests = config.getInt(ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG);
@@ -85,7 +84,7 @@ public class ElasticsearchSinkTask extends SinkTask {
        this.client = factory.getObject();
      }

      builder = new ElasticsearchWriter.Builder(this.client)
      ElasticsearchWriter.Builder builder = new ElasticsearchWriter.Builder(this.client)
          .setType(type)
          .setIgnoreKey(ignoreKey)
          .setIgnoreSchema(ignoreSchema)
@@ -96,9 +95,10 @@ public class ElasticsearchSinkTask extends SinkTask {
          .setBatchSize(batchSize)
          .setLingerMs(lingerMs)
          .setRetryBackoffMs(retryBackoffMs)
          .setMaxRetry(maxRetry)
          .setContext(context);
          .setMaxRetry(maxRetry);

      writer = builder.build();
      writer.start();
    } catch (ConfigException e) {
      throw new ConnectException("Couldn't start ElasticsearchSinkTask due to configuration error:", e);
    }
@@ -107,37 +107,36 @@ public class ElasticsearchSinkTask extends SinkTask {
  @Override
  public void open(Collection<TopicPartition> partitions) {
    log.debug("Opening the task for topic partitions: {}", partitions);
    writer = builder.build();
    Set<String> topics = new HashSet<>();
    for (TopicPartition tp : partitions) {
      topics.add(tp.topic());
    }
    writer.createIndices(topics);
  }

  @Override
  public void put(Collection<SinkRecord> records) throws ConnectException {
    log.trace("Putting {} to Elasticsearch.", records);
    if (writer != null) {
    writer.write(records);
  }
  }

  @Override
  public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
    log.trace("Flushing data to Elasticsearch with the following offsets: {}", offsets);
    if (writer != null) {
    writer.flush();
  }
  }

  @Override
  public void close(Collection<TopicPartition> partitions) {
    log.debug("Closing the task for topic partitions: {}", partitions);
    if (writer != null) {
      writer.close();
      writer = null;
    }
  }

  @Override
  public void stop() throws ConnectException {
    log.info("Stopping ElasticsearchSinkTask.");
    if (writer != null) {
      writer.stop();
    }
    if (client != null) {
      client.shutdownClient();
    }
+49 −195
Original line number Diff line number Diff line
@@ -16,72 +16,41 @@

package io.confluent.connect.elasticsearch;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import io.confluent.connect.elasticsearch.internals.BulkProcessor;
import io.confluent.connect.elasticsearch.internals.ESRequest;
import io.confluent.connect.elasticsearch.internals.HttpClient;
import io.confluent.connect.elasticsearch.internals.Listener;
import io.confluent.connect.elasticsearch.internals.RecordBatch;
import io.confluent.connect.elasticsearch.internals.Response;
import io.confluent.connect.elasticsearch.bulk.BulkProcessor;
import io.searchbox.action.Action;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
import io.searchbox.indices.CreateIndex;
import io.searchbox.indices.IndicesExists;

/**
 * The ElasticsearchWriter handles connections to Elasticsearch, sending data and flush.
 * Transport client is used to send requests to Elasticsearch cluster. Requests are batched
 * when sending to Elasticsearch. To ensure delivery guarantee and order, we retry in case of
 * failures for a batch.
 *
 * Currently, we only send out requests to Elasticsearch when flush is called, which is not
 * desirable from the latency point of view.
 *
 * TODO: Use offset as external version to fence requests with lower version.
 */
// TODO: Use offset as external version to fence requests with lower version.
public class ElasticsearchWriter {
  private static final Logger log = LoggerFactory.getLogger(ElasticsearchWriter.class);

  private final JestClient client;
  private final BulkProcessor bulkProcessor;
  private final String type;
  private final boolean ignoreKey;
  private final boolean ignoreSchema;
  private final Map<String, TopicConfig> topicConfigs;
  private final long flushTimeoutMs;
  private final long maxBufferedRecords;
  private final SinkTaskContext context;

  private final Set<String> mappings;
  private final BulkProcessor<IndexableRecord, ?> bulkProcessor;

  /**
   * ElasticsearchWriter constructor
   * @param client The client to connect to Elasticsearch.
   * @param type The type to use when writing to Elasticsearch.
   * @param ignoreKey Whether to ignore key during indexing.
   * @param ignoreSchema Whether to ignore schema during indexing.
   * @param topicConfigs The map of per topic configs.
   * @param flushTimeoutMs The flush timeout.
   * @param maxBufferedRecords The max number of buffered records.
   * @param maxInFlightRequests The max number of inflight requests allowed.
   * @param batchSize Approximately the max number of records each writer will buffer.
   * @param lingerMs The time to wait before sending a batch.
   * @param context The SinkTaskContext.
   */
  ElasticsearchWriter(
      JestClient client,
      String type,
@@ -89,39 +58,32 @@ public class ElasticsearchWriter {
      boolean ignoreSchema,
      Map<String, TopicConfig> topicConfigs,
      long flushTimeoutMs,
      long maxBufferedRecords,
      int maxBufferedRecords,
      int maxInFlightRequests,
      int batchSize,
      long lingerMs,
      int maxRetry,
      long retryBackoffMs,
      SinkTaskContext context) {

      int maxRetries,
      long retryBackoffMs
  ) {
    this.client = client;
    this.type = type;
    this.ignoreKey = ignoreKey;
    this.ignoreSchema = ignoreSchema;

    if (topicConfigs == null) {
      this.topicConfigs = new HashMap<>();
    } else {
      this.topicConfigs = topicConfigs;
    }

    this.topicConfigs = topicConfigs == null ? Collections.<String, TopicConfig>emptyMap() : topicConfigs;
    this.flushTimeoutMs = flushTimeoutMs;
    this.maxBufferedRecords  = maxBufferedRecords;

    this.context = context;

    // create index if needed.
    createIndices(topicConfigs);

    // Start the BulkProcessor
    bulkProcessor = new BulkProcessor(new HttpClient(client), maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs, createDefaultListener());
    bulkProcessor.start();

    //Create mapping cache
    mappings = new HashSet<>();

    bulkProcessor = new BulkProcessor<>(
        new SystemTime(),
        new BulkIndexingClient(client),
        maxBufferedRecords,
        maxInFlightRequests,
        batchSize,
        lingerMs,
        maxRetries,
        retryBackoffMs
    );
  }

  public static class Builder {
@@ -131,188 +93,102 @@ public class ElasticsearchWriter {
    private boolean ignoreSchema = false;
    private Map<String, TopicConfig> topicConfigs = new HashMap<>();
    private long flushTimeoutMs;
    private long maxBufferedRecords;
    private int maxBufferedRecords;
    private int maxInFlightRequests;
    private int batchSize;
    private long lingerMs;
    private int maxRetry;
    private long retryBackoffMs;
    private SinkTaskContext context;

    /**
     * Constructor of ElasticsearchWriter Builder.
     * @param client The client to connect to Elasticsearch.
     */
    public Builder(JestClient client) {
      this.client = client;
    }

    /**
     * Set the index.
     * @param type The type to use for each index.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setType(String type) {
      this.type = type;
      return this;
    }

    /**
     * Set whether to ignore key during indexing.
     * @param ignoreKey Whether to ignore key.
     * @return an instance of ElasticsearchWriter Builder.
     */

    public Builder setIgnoreKey(boolean ignoreKey) {
      this.ignoreKey = ignoreKey;
      return this;
    }

    /**
     * Set whether to ignore schema during indexing.
     * @param ignoreSchema Whether to ignore key.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setIgnoreSchema(boolean ignoreSchema) {
      this.ignoreSchema = ignoreSchema;
      return this;
    }

    /**
     * Set per topic configurations.
     * @param topicConfigs The map of per topic configuration.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setTopicConfigs(Map<String, TopicConfig> topicConfigs) {
      this.topicConfigs = topicConfigs;
      return this;
    }

    /**
     * Set the flush timeout.
     * @param flushTimeoutMs The flush timeout in milliseconds.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setFlushTimoutMs(long flushTimeoutMs) {
      this.flushTimeoutMs = flushTimeoutMs;
      return this;
    }

    /**
     * Set the max number of records to buffer for each writer.
     * @param maxBufferedRecords The max number of buffered records.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setMaxBufferedRecords(long maxBufferedRecords) {
    public Builder setMaxBufferedRecords(int maxBufferedRecords) {
      this.maxBufferedRecords = maxBufferedRecords;
      return this;
    }

    /**
     * Set the max number of inflight requests.
     * @param maxInFlightRequests The max allowed number of inflight requests.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setMaxInFlightRequests(int maxInFlightRequests) {
      this.maxInFlightRequests = maxInFlightRequests;
      return this;
    }

    /**
     * Set the number of requests to process as a batch when writing.
     * to Elasticsearch.
     * @param batchSize the size of each batch.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setBatchSize(int batchSize) {
      this.batchSize = batchSize;
      return this;
    }

    /**
     * Set the linger time.
     * @param lingerMs The linger time to use in milliseconds.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setLingerMs(long lingerMs) {
      this.lingerMs = lingerMs;
      return this;
    }

    /**
     * Set the max retry for a batch
     * @param maxRetry The number of max retry.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setMaxRetry(int maxRetry) {
      this.maxRetry = maxRetry;
      return this;
    }

    /**
     * Set the retry backoff.
     * @param retryBackoffMs The retry backoff in milliseconds.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setRetryBackoffMs(long retryBackoffMs) {
      this.retryBackoffMs = retryBackoffMs;
      return this;
    }

    /**
     * Set the SinkTaskContext
     * @param context The SinkTaskContext.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setContext(SinkTaskContext context) {
      this.context = context;
      return this;
    }

    /**
     * Build the ElasticsearchWriter.
     * @return an instance of ElasticsearchWriter.
     */
    public ElasticsearchWriter build() {
      return new ElasticsearchWriter(
          client, type, ignoreKey, ignoreSchema, topicConfigs, flushTimeoutMs, maxBufferedRecords, maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs, context);
          client, type, ignoreKey, ignoreSchema, topicConfigs, flushTimeoutMs, maxBufferedRecords, maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs
      );
    }
  }

  public void write(Collection<SinkRecord> records) {
    if (bulkProcessor.getException() != null) {
      throw new ConnectException("BulkProcessor failed with non-retriable exception", bulkProcessor.getException());
    }
    if (bulkProcessor.getTotalBufferedRecords() + records.size() > maxBufferedRecords) {
      throw new RetriableException("Exceeded max number of buffered records: " + maxBufferedRecords);
    }
    for (SinkRecord record: records) {
      ESRequest request = DataConverter.convertRecord(record, type, client, ignoreKey, ignoreSchema, topicConfigs, mappings);
      bulkProcessor.add(request);
    for (SinkRecord sinkRecord : records) {
      final IndexableRecord indexableRecord = DataConverter.convertRecord(sinkRecord, type, client, ignoreKey, ignoreSchema, topicConfigs, mappings);
      bulkProcessor.add(indexableRecord, flushTimeoutMs);
    }
  }

  public void flush() {
    try {
      if (!bulkProcessor.flush(flushTimeoutMs)) {
        throw new ConnectException("Cannot finish flush messages within " + flushTimeoutMs);
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    } catch (Throwable t) {
      throw new ConnectException("Flush failed with non retriable exception.", t);
    bulkProcessor.flush(flushTimeoutMs);
  }

  public void start() {
    bulkProcessor.start();
  }

  public void close() {
    bulkProcessor.stop();
  public void stop() {
    try {
      bulkProcessor.awaitStop(flushTimeoutMs);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    } catch (Throwable t) {
      throw new ConnectException("Close failed with non retriable exception", t);
      bulkProcessor.flush(flushTimeoutMs);
    } catch (Exception e) {
      log.warn("Failed to flush during stop", e);
    }
    bulkProcessor.stop();
    bulkProcessor.awaitStop(flushTimeoutMs);
  }

  private boolean indexExists(String index) {
@@ -325,20 +201,16 @@ public class ElasticsearchWriter {
    }
  }

  private void createIndices(Map<String, TopicConfig> topicConfigs) {
    Set<TopicPartition> assignment = context.assignment();
    Set<String> topics = new HashSet<>();
    for (TopicPartition tp: assignment) {
      String topic = tp.topic();
      if (!topicConfigs.containsKey(topic)) {
        topics.add(topic);
  public void createIndices(Set<String> assignedTopics) {
    Set<String> indices = new HashSet<>();
    for (String topic : assignedTopics) {
      final TopicConfig topicConfig = topicConfigs.get(topic);
      if (topicConfig != null) {
        indices.add(topicConfig.getIndex());
      } else {
        indices.add(topic);
      }
    }

    Set<String> indices = new HashSet<>(topics);
    for (String topic: topicConfigs.keySet()) {
      indices.add(topicConfigs.get(topic).getIndex());
    }
    for (String index : indices) {
      if (!indexExists(index)) {
        CreateIndex createIndex = new CreateIndex.Builder(index).build();
@@ -354,22 +226,4 @@ public class ElasticsearchWriter {
    }
  }

  private Listener createDefaultListener() {
    return new Listener() {
      @Override
      public void beforeBulk(long executionId, RecordBatch batch) {

      }

      @Override
      public void afterBulk(long executionId, RecordBatch batch, Response response) {

      }

      @Override
      public void afterBulk(long executionId, RecordBatch batch, Throwable failure) {

      }
    };
  }
}
Loading