Commit a9b4849d authored by Liquan Pei's avatar Liquan Pei
Browse files

Use HTTP client

parent 64c2d2df
Loading
Loading
Loading
Loading
+18 −10
Original line number Diff line number Diff line
@@ -12,11 +12,12 @@
        <confluent.version>3.0.0-SNAPSHOT</confluent.version>
        <kafka.version>0.10.1.0-SNAPSHOT</kafka.version>
        <junit.version>4.12</junit.version>
        <es.version>2.3.3</es.version>
        <lucene.version>5.3.1</lucene.version>
        <es.version>2.2.1</es.version>
        <lucene.version>5.4.1</lucene.version>
        <slf4j.version>1.7.5</slf4j.version>
        <jna.version>4.2.1</jna.version>
        <hamcrest.version>2.0.0.0</hamcrest.version>
        <jest.version>2.0.0</jest.version>
        <confluent.maven.repo>http://packages.confluent.io/maven/</confluent.maven.repo>
    </properties>

@@ -40,14 +41,9 @@
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${es.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.lucene</groupId>
            <artifactId>lucene-expressions</artifactId>
            <version>${lucene.version}</version>
            <groupId>io.searchbox</groupId>
            <artifactId>jest</artifactId>
            <version>${jest.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
@@ -91,6 +87,18 @@
            <scope>test</scope>
            <type>test-jar</type>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${es.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.lucene</groupId>
            <artifactId>lucene-expressions</artifactId>
            <version>${lucene.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
+9 −9
Original line number Diff line number Diff line
@@ -29,7 +29,6 @@ import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.Converter;
import org.elasticsearch.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -41,6 +40,7 @@ import java.util.Map;
import java.util.Set;

import io.confluent.connect.elasticsearch.internals.ESRequest;
import io.searchbox.client.JestClient;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_VALUE;
@@ -100,7 +100,7 @@ public class DataConverter {
  public static ESRequest convertRecord(
      SinkRecord record,
      String type,
      Client client,
      JestClient client,
      Converter converter,
      boolean ignoreKey,
      boolean ignoreSchema,
@@ -138,17 +138,17 @@ public class DataConverter {
      id = DataConverter.convertKey(key, keySchema);
    }

    if (!topicIgnoreSchema && !mappings.contains(index) && !Mapping.doesMappingExist(client, index, type, mappings)) {
    try {
      if (!topicIgnoreSchema && !mappings.contains(index) && !Mapping.doesMappingExist(client, index, type, mappings)) {
        Mapping.createMapping(client, index, type, valueSchema);
        mappings.add(index);
      }
    } catch (IOException e) {
      // TODO: It is possible that two clients are creating the mapping at the same time and
      // one request to create mapping may fail. In this case, we should allow the task to
      // proceed instead of throw the exception.
      throw new ConnectException("Cannot create mapping:", e);
    }
    }

    Schema newSchema;
    Object newValue;
+4 −4
Original line number Diff line number Diff line
@@ -29,9 +29,9 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
  private static final String ELASTICSEARCH_GROUP = "Elasticsearch";
  private static final String CONNECTOR_GROUP = "Connector";

  public static final String TRANSPORT_ADDRESSES_CONFIG = "transport.addresses";
  private static final String TRANSPORT_ADDRESSES_DOC = "The list of addresses to connect to Elasticsearch.";
  private static final String TRANSPORT_ADDRESSES_DISPLAY = "Transport Addresses";
  public static final String HTTP_ADDRESSES_CONFIG = "http.addresses";
  private static final String HTTP_ADDRESSES_DOC = "The list of addresses to connect to Elasticsearch.";
  private static final String HTTP_ADDRESSES_DISPLAY = "HTTP Addresses";

  public static final String TYPE_NAME_CONFIG = "type.name";
  private static final String TYPE_NAME_DOC = "The type to use for each index.";
@@ -119,7 +119,7 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {

  public static ConfigDef baseConfigDef() {
    return new ConfigDef()
        .define(TRANSPORT_ADDRESSES_CONFIG, Type.LIST, Importance.HIGH, TRANSPORT_ADDRESSES_DOC, ELASTICSEARCH_GROUP, 1, Width.LONG, TRANSPORT_ADDRESSES_DISPLAY)
        .define(HTTP_ADDRESSES_CONFIG, Type.LIST, Importance.HIGH, HTTP_ADDRESSES_DOC, ELASTICSEARCH_GROUP, 1, Width.LONG, HTTP_ADDRESSES_DISPLAY)
        .define(TYPE_NAME_CONFIG, Type.STRING, Importance.HIGH, TYPE_NAME_DOC, ELASTICSEARCH_GROUP, 2, Width.SHORT, TYPE_NAME_DISPLAY)
        .define(KEY_IGNORE_CONFIG, Type.BOOLEAN, KEY_IGNORE_DEFAULT, Importance.HIGH, KEY_IGNORE_DOC, CONNECTOR_GROUP, 3, Width.SHORT, KEY_IGNORE_DISPLAY)
        .define(BATCH_SIZE_CONFIG, Type.INT, BATCH_SIZE_DEFAULT, Importance.MEDIUM, BATCH_SIZE_DOC, CONNECTOR_GROUP, 4, Width.SHORT, BATCH_SIZE_DISPLAY)
+11 −42
Original line number Diff line number Diff line
@@ -24,27 +24,25 @@ import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.Converter;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;

import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;

public class ElasticsearchSinkTask extends SinkTask {

  private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class);
  private ElasticsearchWriter writer;
  private Client client;
  private JestClient client;
  private ElasticsearchWriter.Builder builder;
  private static Converter converter;

@@ -67,7 +65,7 @@ public class ElasticsearchSinkTask extends SinkTask {
  }

  // public for testing
  public void start(Map<String, String> props, Client client) {
  public void start(Map<String, String> props, JestClient client) {
    try {
      log.info("Starting ElasticsearchSinkTask.");

@@ -92,14 +90,10 @@ public class ElasticsearchSinkTask extends SinkTask {
      if (client != null) {
        this.client = client;
      } else {
        TransportClient transportClient = TransportClient.builder().build();
        List<InetSocketTransportAddress> addresses = parseAddress(config.getList(ElasticsearchSinkConnectorConfig.TRANSPORT_ADDRESSES_CONFIG));

        for (InetSocketTransportAddress address: addresses) {
          transportClient.addTransportAddress(address);
        }

        this.client = transportClient;
        List<String> addresses = config.getList(ElasticsearchSinkConnectorConfig.HTTP_ADDRESSES_CONFIG);
        JestClientFactory factory = new JestClientFactory();
        factory.setHttpClientConfig(new HttpClientConfig.Builder(addresses.get(0)).multiThreaded(true).build());
        this.client = factory.getObject();
      }

      builder = new ElasticsearchWriter.Builder(this.client)
@@ -119,8 +113,6 @@ public class ElasticsearchSinkTask extends SinkTask {

    } catch (ConfigException e) {
      throw new ConnectException("Couldn't start ElasticsearchSinkTask due to configuration error:", e);
    } catch (UnknownHostException e) {
      throw new ConnectException("Couldn't start ElasticsearchSinkTask due to unknown host exception:", e);
    }
  }

@@ -154,7 +146,7 @@ public class ElasticsearchSinkTask extends SinkTask {
  public void stop() throws ConnectException {
    log.info("Stopping ElasticsearchSinkTask.");
    if (client != null) {
      client.close();
      client.shutdownClient();
    }
  }

@@ -183,29 +175,6 @@ public class ElasticsearchSinkTask extends SinkTask {
    return topicConfigMap;
  }

  private InetSocketTransportAddress parseAddress(String address) throws UnknownHostException {
    String[] parts = address.split(":");
    if (parts.length != 2) {
      throw new ConfigException("Not valid address: " + address);
    }
    String host = parts[0];
    int port;
    try {
      port = Integer.parseInt(parts[1]);
    } catch (NumberFormatException e) {
      throw new ConfigException("port is not a valid.", e);
    }
    return new InetSocketTransportAddress(InetAddress.getByName(host), port);
  }

  private List<InetSocketTransportAddress> parseAddress(List<String> addresses) throws UnknownHostException {
    List<InetSocketTransportAddress> transportAddresses = new LinkedList<>();
    for (String address: addresses) {
      transportAddresses.add(parseAddress(address));
    }
    return transportAddresses;
  }

  public static Converter getConverter() {
    return converter;
  }
+32 −40
Original line number Diff line number Diff line
@@ -21,11 +21,10 @@ import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -34,9 +33,15 @@ import java.util.Set;

import io.confluent.connect.elasticsearch.internals.BulkProcessor;
import io.confluent.connect.elasticsearch.internals.ESRequest;
import io.confluent.connect.elasticsearch.internals.ESResponse;
import io.confluent.connect.elasticsearch.internals.HttpClient;
import io.confluent.connect.elasticsearch.internals.Listener;
import io.confluent.connect.elasticsearch.internals.RecordBatch;
import io.confluent.connect.elasticsearch.internals.Response;
import io.searchbox.action.Action;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
import io.searchbox.indices.CreateIndex;
import io.searchbox.indices.IndicesExists;

/**
 * The ElasticsearchWriter handles connections to Elasticsearch, sending data and flush.
@@ -47,8 +52,6 @@ import io.confluent.connect.elasticsearch.internals.RecordBatch;
 * Currently, we only send out requests to Elasticsearch when flush is called, which is not
 * desirable from the latency point of view.
 *
 * TODO: Use REST instead of transport client.
 *
 * TODO: Use offset as external version to fence requests with lower version.
 */
public class ElasticsearchWriter {
@@ -56,7 +59,7 @@ public class ElasticsearchWriter {

  private final Converter converter;

  private final Client client;
  private final JestClient client;
  private final BulkProcessor bulkProcessor;
  private final String type;
  private final boolean ignoreKey;
@@ -80,10 +83,9 @@ public class ElasticsearchWriter {
   * @param batchSize Approximately the max number of records each writer will buffer.
   * @param lingerMs The time to wait before sending a batch.
   * @param context The SinkTaskContext.
   * @param mock Whether to use mock Elasticsearch client.
   */
  ElasticsearchWriter(
      Client client,
      JestClient client,
      String type,
      boolean ignoreKey,
      boolean ignoreSchema,
@@ -96,8 +98,7 @@ public class ElasticsearchWriter {
      int maxRetry,
      long retryBackoffMs,
      SinkTaskContext context,
      Converter converter,
      boolean mock) {
      Converter converter) {

    this.client = client;
    this.type = type;
@@ -116,15 +117,13 @@ public class ElasticsearchWriter {
    this.context = context;

    // create index if needed.
    if (!mock) {
    createIndices(topicConfigs);
    }

    // Config the JsonConverter
    this.converter = converter;

    // Start the BulkProcessor
    bulkProcessor = new BulkProcessor(client, maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs, createDefaultListener());
    bulkProcessor = new BulkProcessor(new HttpClient(client), maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs, createDefaultListener());
    bulkProcessor.start();

    //Create mapping cache
@@ -132,8 +131,7 @@ public class ElasticsearchWriter {
  }

  public static class Builder {

    private final Client client;
    private final JestClient client;
    private String type;
    private boolean ignoreKey = false;
    private boolean ignoreSchema = false;
@@ -147,13 +145,12 @@ public class ElasticsearchWriter {
    private long retryBackoffMs;
    private SinkTaskContext context;
    private Converter converter = ElasticsearchSinkTask.getConverter();
    private boolean mock;

    /**
     * Constructor of ElasticsearchWriter Builder.
     * @param client The client to connect to Elasticsearch.
     */
    public Builder(Client client) {
    public Builder(JestClient client) {
      this.client = client;
    }

@@ -289,23 +286,13 @@ public class ElasticsearchWriter {
      return this;
    }

    /**
     * Set whether to use the mock client to connect to Elasticsearch.
     * @param mock Whether to use mock client.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setMock(boolean mock) {
      this.mock = mock;
      return this;
    }

    /**
     * Build the ElasticsearchWriter.
     * @return an instance of ElasticsearchWriter.
     */
    public ElasticsearchWriter build() {
      return new ElasticsearchWriter(
          client, type, ignoreKey, ignoreSchema, topicConfigs, flushTimeoutMs, maxBufferedRecords, maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs, context, converter, mock);
          client, type, ignoreKey, ignoreSchema, topicConfigs, flushTimeoutMs, maxBufferedRecords, maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs, context, converter);
    }
  }

@@ -345,9 +332,14 @@ public class ElasticsearchWriter {
  }

  private boolean indexExists(String index) {
    return client.admin().indices().prepareExists(index).execute().actionGet().isExists();
    Action action = new IndicesExists.Builder(index).build();
    try {
      JestResult result = client.execute(action);
      return result.isSucceeded();
    } catch (IOException e) {
      throw new ConnectException(e);
    }
  }


  private void createIndices(Map<String, TopicConfig> topicConfigs) {
    Set<TopicPartition> assignment = context.assignment();
@@ -363,17 +355,17 @@ public class ElasticsearchWriter {
    for (String topic: topicConfigs.keySet()) {
      indices.add(topicConfigs.get(topic).getIndex());
    }

    for (String index: indices) {
      if (!indexExists(index)) {
        CreateIndexResponse createIndexResponse = client.admin().indices()
            .prepareCreate(index)
            .execute()
            .actionGet();

        if (!createIndexResponse.isAcknowledged()) {
        CreateIndex createIndex = new CreateIndex.Builder(index).build();
        try {
          JestResult result = client.execute(createIndex);
          if (!result.isSucceeded()) {
            throw new ConnectException("Could not create index:" + index);
          }
        } catch (IOException e) {
          throw new ConnectException(e);
        }
      }
    }
  }
@@ -386,7 +378,7 @@ public class ElasticsearchWriter {
      }

      @Override
      public void afterBulk(long executionId, RecordBatch batch, ESResponse response) {
      public void afterBulk(long executionId, RecordBatch batch, Response response) {

      }

Loading