Unverified Commit 23a31fc9 authored by Robert Yokota's avatar Robert Yokota Committed by GitHub
Browse files

Merge pull request #215 from rayokota/CC-374-basic-auth

CC-374 Support basic auth with secure Elasticsearch
parents 5a8cb33b 9d3b97e0
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -9,7 +9,7 @@
    <!-- switch statements on types exceed maximum complexity -->
    <suppress
            checks="(CyclomaticComplexity)"
            files="(Mapping).java"
            files="(JestElasticsearchClient|Mapping).java"
    />

    <!-- TODO: Undecided if this is too much -->
+12 −0
Original line number Diff line number Diff line
@@ -13,6 +13,18 @@ Connector
  * Type: list
  * Importance: high

``connection.username``
  The username used to authenticate with Elasticsearch. The default is null, and authentication will only be performed if both the username and password are non-null.

  * Type: string
  * Importance: medium

``connection.password``
  The password used to authenticate with Elasticsearch. The default is null, and authentication will only be performed if both the username and password are non-null.

  * Type: string
  * Importance: medium

``batch.size``
  The number of records to process as a batch when writing to Elasticsearch.

+3 −1
Original line number Diff line number Diff line
@@ -338,4 +338,6 @@ connector jobs to achieve double writes:
      
Security
--------
The Elasticsearch connector can read data from secure Kafka by following the instructions in the :ref:`Connect security documentation <connect_security>`. The functionality to write data to a secured Elasticsearch instance is not yet implemented.
The Elasticsearch connector can read data from secure Kafka by following the instructions in the :ref:`Connect security documentation <connect_security>`.

The Elasticsearch connector can write data to a secure Elasticsearch cluster that supports basic authentication by setting the ``connection.username`` and ``connection.password`` configuration properties.
+30 −0
Original line number Diff line number Diff line
@@ -33,6 +33,16 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
  private static final String CONNECTION_URL_DOC =
      "List of Elasticsearch HTTP connection URLs e.g. ``http://eshost1:9200,"
      + "http://eshost2:9200``.";
  public static final String CONNECTION_USERNAME_CONFIG = "connection.username";
  private static final String CONNECTION_USERNAME_DOC =
      "The username used to authenticate with Elasticsearch. "
      + "The default is the null, and authentication will only be performed if "
      + " both the username and password are non-null.";
  public static final String CONNECTION_PASSWORD_CONFIG = "connection.password";
  private static final String CONNECTION_PASSWORD_DOC =
      "The password used to authenticate with Elasticsearch. "
      + "The default is the null, and authentication will only be performed if "
      + " both the username and password are non-null.";
  public static final String BATCH_SIZE_CONFIG = "batch.size";
  private static final String BATCH_SIZE_DOC =
      "The number of records to process as a batch when writing to Elasticsearch.";
@@ -158,6 +168,26 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
        ++order,
        Width.LONG,
        "Connection URLs"
    ).define(
        CONNECTION_USERNAME_CONFIG,
        Type.STRING,
        null,
        Importance.MEDIUM,
        CONNECTION_USERNAME_DOC,
        group,
        ++order,
        Width.SHORT,
        "Connection Username"
    ).define(
        CONNECTION_PASSWORD_CONFIG,
        Type.STRING,
        null,
        Importance.MEDIUM,
        CONNECTION_PASSWORD_DOC,
        group,
        ++order,
        Width.SHORT,
        "Connection Password"
    ).define(
        BATCH_SIZE_CONFIG,
        Type.INT,
+33 −5
Original line number Diff line number Diff line
@@ -44,6 +44,7 @@ import io.searchbox.indices.CreateIndex;
import io.searchbox.indices.IndicesExists;
import io.searchbox.indices.mapping.GetMapping;
import io.searchbox.indices.mapping.PutMapping;
import org.apache.http.HttpHost;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
@@ -55,6 +56,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class JestElasticsearchClient implements ElasticsearchClient {

@@ -108,6 +110,11 @@ public class JestElasticsearchClient implements ElasticsearchClient {
  }

  public JestElasticsearchClient(Map<String, String> props) {
    this(props, new JestClientFactory());
  }

  // visible for testing
  protected JestElasticsearchClient(Map<String, String> props, JestClientFactory factory) {
    try {
      ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
      final int connTimeout = config.getInt(
@@ -115,15 +122,24 @@ public class JestElasticsearchClient implements ElasticsearchClient {
      final int readTimeout = config.getInt(
          ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG);

      final String username = config.getString(
          ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG);
      final String password = config.getString(
          ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG);

      List<String> address =
          config.getList(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG);
      JestClientFactory factory = new JestClientFactory();
      factory.setHttpClientConfig(new HttpClientConfig.Builder(address)
      HttpClientConfig.Builder builder = new HttpClientConfig.Builder(address)
          .connTimeout(connTimeout)
          .readTimeout(readTimeout)
          .multiThreaded(true)
          .build()
      );
          .multiThreaded(true);
      if (username != null && password != null) {
        builder.defaultCredentials(username, password)
            .preemptiveAuthTargetHosts(address.stream()
                .map(addr -> HttpHost.create(addr)).collect(Collectors.toSet()));
      }
      HttpClientConfig httpClientConfig = builder.build();
      factory.setHttpClientConfig(httpClientConfig);
      this.client = factory.getObject();
      this.version = getServerVersion();
    } catch (IOException e) {
@@ -154,6 +170,8 @@ public class JestElasticsearchClient implements ElasticsearchClient {
      return defaultVersion;
    }

    checkForError(result);

    JsonObject nodesRoot = result.get("nodes").getAsJsonObject();
    if (nodesRoot == null || nodesRoot.entrySet().size() == 0) {
      LOG.warn("Couldn't get Elasticsearch version, nodesRoot is null or empty");
@@ -182,6 +200,16 @@ public class JestElasticsearchClient implements ElasticsearchClient {
    return defaultVersion;
  }

  private void checkForError(JsonObject result) {
    if (result.has("error") && result.get("error").isJsonObject()) {
      final JsonObject errorObject = result.get("error").getAsJsonObject();
      String errorType = errorObject.has("type") ? errorObject.get("type").getAsString() : "";
      String errorReason = errorObject.has("reason") ? errorObject.get("reason").getAsString() : "";
      throw new ConnectException("Couldn't connect to Elasticsearch, error: "
          + errorType + ", reason: " + errorReason);
    }
  }

  public Version getVersion() {
    return version;
  }
Loading