Commit 3be1adee authored by Shikhar Bhushan's avatar Shikhar Bhushan
Browse files

MINOR: Remove generic Converter passing to the DataConverter

It is more of an implementation detail that DataConverter relies on the Connect JsonConverter
parent 635b4fbe
Loading
Loading
Loading
Loading
+10 −6
Original line number Diff line number Diff line
@@ -27,15 +27,15 @@ import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;

@@ -47,10 +47,15 @@ import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConst

public class DataConverter {

  private static final Logger log = LoggerFactory.getLogger(DataConverter.class);
  private static final Converter JSON_CONVERTER;
  static {
    JSON_CONVERTER = new JsonConverter();
    JSON_CONVERTER.configure(Collections.singletonMap("schemas.enable", "false"), false);
  }

  /**
   * Convert the key to the string representation.
   *
   * @param key The key of a SinkRecord.
   * @param keySchema The key schema.
   * @return The string representation of the key.
@@ -83,9 +88,9 @@ public class DataConverter {

  /**
   * Convert a SinkRecord to an IndexRequest.
   *
   * @param record The SinkRecord to be converted.
   * @param client The client to connect to Elasticsearch.
   * @param converter The converter to use to convert the value to JSON.
   * @param ignoreKey Whether to ignore the key during indexing.
   * @param ignoreSchema Whether to ignore the schema during indexing.
   * @param topicConfigs The map of per topic configs.
@@ -97,7 +102,6 @@ public class DataConverter {
      SinkRecord record,
      String type,
      JestClient client,
      Converter converter,
      boolean ignoreKey,
      boolean ignoreSchema,
      Map<String, TopicConfig> topicConfigs,
@@ -156,7 +160,7 @@ public class DataConverter {
      newValue = value;
    }

    byte[] json = converter.fromConnectData(topic, newSchema, newValue);
    byte[] json = JSON_CONVERTER.fromConnectData(topic, newSchema, newValue);
    return new ESRequest(index, type, id, json);
  }

+1 −16
Original line number Diff line number Diff line
@@ -20,10 +20,8 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -44,15 +42,6 @@ public class ElasticsearchSinkTask extends SinkTask {
  private ElasticsearchWriter writer;
  private JestClient client;
  private ElasticsearchWriter.Builder builder;
  private static Converter converter;

  static {
    // Config the JsonConverter
    converter = new JsonConverter();
    Map<String, String> configs = new HashMap<>();
    configs.put("schemas.enable", "false");
    converter.configure(configs, false);
  }

  @Override
  public String version() {
@@ -108,8 +97,7 @@ public class ElasticsearchSinkTask extends SinkTask {
          .setLingerMs(lingerMs)
          .setRetryBackoffMs(retryBackoffMs)
          .setMaxRetry(maxRetry)
          .setContext(context)
          .setConverter(converter);
          .setContext(context);

    } catch (ConfigException e) {
      throw new ConnectException("Couldn't start ElasticsearchSinkTask due to configuration error:", e);
@@ -180,7 +168,4 @@ public class ElasticsearchSinkTask extends SinkTask {
    return topicConfigMap;
  }

  public static Converter getConverter() {
    return converter;
  }
}
+3 −21
Original line number Diff line number Diff line
@@ -21,7 +21,6 @@ import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -58,8 +57,6 @@ import io.searchbox.indices.IndicesExists;
public class ElasticsearchWriter {
  private static final Logger log = LoggerFactory.getLogger(ElasticsearchWriter.class);

  private final Converter converter;

  private final JestClient client;
  private final BulkProcessor bulkProcessor;
  private final String type;
@@ -98,8 +95,7 @@ public class ElasticsearchWriter {
      long lingerMs,
      int maxRetry,
      long retryBackoffMs,
      SinkTaskContext context,
      Converter converter) {
      SinkTaskContext context) {

    this.client = client;
    this.type = type;
@@ -120,9 +116,6 @@ public class ElasticsearchWriter {
    // create index if needed.
    createIndices(topicConfigs);

    // Config the JsonConverter
    this.converter = converter;

    // Start the BulkProcessor
    bulkProcessor = new BulkProcessor(new HttpClient(client), maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs, createDefaultListener());
    bulkProcessor.start();
@@ -145,7 +138,6 @@ public class ElasticsearchWriter {
    private int maxRetry;
    private long retryBackoffMs;
    private SinkTaskContext context;
    private Converter converter = ElasticsearchSinkTask.getConverter();

    /**
     * Constructor of ElasticsearchWriter Builder.
@@ -277,23 +269,13 @@ public class ElasticsearchWriter {
      return this;
    }

    /**
     * Set the converter.
     * @param converter The converter to use.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setConverter(Converter converter) {
      this.converter = converter;
      return this;
    }

    /**
     * Build the ElasticsearchWriter.
     * @return an instance of ElasticsearchWriter.
     */
    public ElasticsearchWriter build() {
      return new ElasticsearchWriter(
          client, type, ignoreKey, ignoreSchema, topicConfigs, flushTimeoutMs, maxBufferedRecords, maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs, context, converter);
          client, type, ignoreKey, ignoreSchema, topicConfigs, flushTimeoutMs, maxBufferedRecords, maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs, context);
    }
  }

@@ -305,7 +287,7 @@ public class ElasticsearchWriter {
      throw new RetriableException("Exceeded max number of buffered records: " + maxBufferedRecords);
    }
    for (SinkRecord record: records) {
      ESRequest request = DataConverter.convertRecord(record, type, client, converter, ignoreKey, ignoreSchema, topicConfigs, mappings);
      ESRequest request = DataConverter.convertRecord(record, type, client, ignoreKey, ignoreSchema, topicConfigs, mappings);
      bulkProcessor.add(request);
    }
  }