Commit fba141ef authored by Liquan Pei's avatar Liquan Pei Committed by GitHub
Browse files

Merge pull request #3 from confluentinc/http

Use HTTP client
parents 64c2d2df 26f2bea1
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -19,5 +19,5 @@ connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=test-elasticsearch-sink
key.ignore=true
transport.addresses=localhost:9300
connection.url=localhost:9200
type.name=kafka-connect
 No newline at end of file
+17 −9
Original line number Diff line number Diff line
@@ -13,10 +13,11 @@
        <kafka.version>0.10.1.0-SNAPSHOT</kafka.version>
        <junit.version>4.12</junit.version>
        <es.version>2.3.3</es.version>
        <lucene.version>5.3.1</lucene.version>
        <lucene.version>5.5.0</lucene.version>
        <slf4j.version>1.7.5</slf4j.version>
        <jna.version>4.2.1</jna.version>
        <hamcrest.version>2.0.0.0</hamcrest.version>
        <jest.version>2.0.0</jest.version>
        <confluent.maven.repo>http://packages.confluent.io/maven/</confluent.maven.repo>
    </properties>

@@ -40,14 +41,9 @@
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${es.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.lucene</groupId>
            <artifactId>lucene-expressions</artifactId>
            <version>${lucene.version}</version>
            <groupId>io.searchbox</groupId>
            <artifactId>jest</artifactId>
            <version>${jest.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
@@ -91,6 +87,18 @@
            <scope>test</scope>
            <type>test-jar</type>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${es.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.lucene</groupId>
            <artifactId>lucene-expressions</artifactId>
            <version>${lucene.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
+9 −9
Original line number Diff line number Diff line
@@ -29,7 +29,6 @@ import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.Converter;
import org.elasticsearch.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -41,6 +40,7 @@ import java.util.Map;
import java.util.Set;

import io.confluent.connect.elasticsearch.internals.ESRequest;
import io.searchbox.client.JestClient;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_VALUE;
@@ -100,7 +100,7 @@ public class DataConverter {
  public static ESRequest convertRecord(
      SinkRecord record,
      String type,
      Client client,
      JestClient client,
      Converter converter,
      boolean ignoreKey,
      boolean ignoreSchema,
@@ -138,17 +138,17 @@ public class DataConverter {
      id = DataConverter.convertKey(key, keySchema);
    }

    if (!topicIgnoreSchema && !mappings.contains(index) && !Mapping.doesMappingExist(client, index, type, mappings)) {
    try {
      if (!topicIgnoreSchema && !mappings.contains(index) && !Mapping.doesMappingExist(client, index, type, mappings)) {
        Mapping.createMapping(client, index, type, valueSchema);
        mappings.add(index);
      }
    } catch (IOException e) {
      // TODO: It is possible that two clients are creating the mapping at the same time and
      // one request to create mapping may fail. In this case, we should allow the task to
      // proceed instead of throw the exception.
      throw new ConnectException("Cannot create mapping:", e);
    }
    }

    Schema newSchema;
    Object newValue;
+5 −4
Original line number Diff line number Diff line
@@ -29,9 +29,9 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
  private static final String ELASTICSEARCH_GROUP = "Elasticsearch";
  private static final String CONNECTOR_GROUP = "Connector";

  public static final String TRANSPORT_ADDRESSES_CONFIG = "transport.addresses";
  private static final String TRANSPORT_ADDRESSES_DOC = "The list of addresses to connect to Elasticsearch.";
  private static final String TRANSPORT_ADDRESSES_DISPLAY = "Transport Addresses";
  public static final String CONNECTION_URL_CONFIG = "connection.url";
  private static final String CONNECTION_URL_DOC = "The URL to connect to Elasticsearch.";
  private static final String CONNECTION_URL_DISPLAY = "Connection URL";

  public static final String TYPE_NAME_CONFIG = "type.name";
  private static final String TYPE_NAME_DOC = "The type to use for each index.";
@@ -119,7 +119,8 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {

  public static ConfigDef baseConfigDef() {
    return new ConfigDef()
        .define(TRANSPORT_ADDRESSES_CONFIG, Type.LIST, Importance.HIGH, TRANSPORT_ADDRESSES_DOC, ELASTICSEARCH_GROUP, 1, Width.LONG, TRANSPORT_ADDRESSES_DISPLAY)
        .define(CONNECTION_URL_CONFIG, Type.STRING, Importance.HIGH, CONNECTION_URL_DOC, ELASTICSEARCH_GROUP, 1, Width.LONG,
                CONNECTION_URL_DISPLAY)
        .define(TYPE_NAME_CONFIG, Type.STRING, Importance.HIGH, TYPE_NAME_DOC, ELASTICSEARCH_GROUP, 2, Width.SHORT, TYPE_NAME_DISPLAY)
        .define(KEY_IGNORE_CONFIG, Type.BOOLEAN, KEY_IGNORE_DEFAULT, Importance.HIGH, KEY_IGNORE_DOC, CONNECTOR_GROUP, 3, Width.SHORT, KEY_IGNORE_DISPLAY)
        .define(BATCH_SIZE_CONFIG, Type.INT, BATCH_SIZE_DEFAULT, Importance.MEDIUM, BATCH_SIZE_DOC, CONNECTOR_GROUP, 4, Width.SHORT, BATCH_SIZE_DISPLAY)
+11 −42
Original line number Diff line number Diff line
@@ -24,27 +24,25 @@ import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.Converter;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;

import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;

public class ElasticsearchSinkTask extends SinkTask {

  private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class);
  private ElasticsearchWriter writer;
  private Client client;
  private JestClient client;
  private ElasticsearchWriter.Builder builder;
  private static Converter converter;

@@ -67,7 +65,7 @@ public class ElasticsearchSinkTask extends SinkTask {
  }

  // public for testing
  public void start(Map<String, String> props, Client client) {
  public void start(Map<String, String> props, JestClient client) {
    try {
      log.info("Starting ElasticsearchSinkTask.");

@@ -92,14 +90,10 @@ public class ElasticsearchSinkTask extends SinkTask {
      if (client != null) {
        this.client = client;
      } else {
        TransportClient transportClient = TransportClient.builder().build();
        List<InetSocketTransportAddress> addresses = parseAddress(config.getList(ElasticsearchSinkConnectorConfig.TRANSPORT_ADDRESSES_CONFIG));

        for (InetSocketTransportAddress address: addresses) {
          transportClient.addTransportAddress(address);
        }

        this.client = transportClient;
        String address = config.getString(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG);
        JestClientFactory factory = new JestClientFactory();
        factory.setHttpClientConfig(new HttpClientConfig.Builder(address).multiThreaded(true).build());
        this.client = factory.getObject();
      }

      builder = new ElasticsearchWriter.Builder(this.client)
@@ -119,8 +113,6 @@ public class ElasticsearchSinkTask extends SinkTask {

    } catch (ConfigException e) {
      throw new ConnectException("Couldn't start ElasticsearchSinkTask due to configuration error:", e);
    } catch (UnknownHostException e) {
      throw new ConnectException("Couldn't start ElasticsearchSinkTask due to unknown host exception:", e);
    }
  }

@@ -154,7 +146,7 @@ public class ElasticsearchSinkTask extends SinkTask {
  public void stop() throws ConnectException {
    log.info("Stopping ElasticsearchSinkTask.");
    if (client != null) {
      client.close();
      client.shutdownClient();
    }
  }

@@ -183,29 +175,6 @@ public class ElasticsearchSinkTask extends SinkTask {
    return topicConfigMap;
  }

  private InetSocketTransportAddress parseAddress(String address) throws UnknownHostException {
    String[] parts = address.split(":");
    if (parts.length != 2) {
      throw new ConfigException("Not valid address: " + address);
    }
    String host = parts[0];
    int port;
    try {
      port = Integer.parseInt(parts[1]);
    } catch (NumberFormatException e) {
      throw new ConfigException("port is not a valid.", e);
    }
    return new InetSocketTransportAddress(InetAddress.getByName(host), port);
  }

  private List<InetSocketTransportAddress> parseAddress(List<String> addresses) throws UnknownHostException {
    List<InetSocketTransportAddress> transportAddresses = new LinkedList<>();
    for (String address: addresses) {
      transportAddresses.add(parseAddress(address));
    }
    return transportAddresses;
  }

  public static Converter getConverter() {
    return converter;
  }
Loading