Commit 9d3b97e0 authored by Robert Yokota's avatar Robert Yokota
Browse files

Allow for empty username or password

parent dcb664cb
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -14,13 +14,13 @@ Connector
  * Importance: high

``connection.username``
  The username used to authenticate with Elasticsearch. The default is the empty string, and authentication will only be performed if both the username and password are non-empty.
  The username used to authenticate with Elasticsearch. The default is null, and authentication will only be performed if both the username and password are non-null.

  * Type: string
  * Importance: medium

``connection.password``
  The password used to authenticate with Elasticsearch. The default is the empty string, and authentication will only be performed if both the username and password are non-empty.
  The password used to authenticate with Elasticsearch. The default is null, and authentication will only be performed if both the username and password are non-null.

  * Type: string
  * Importance: medium
+6 −6
Original line number Diff line number Diff line
@@ -36,13 +36,13 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
  public static final String CONNECTION_USERNAME_CONFIG = "connection.username";
  private static final String CONNECTION_USERNAME_DOC =
      "The username used to authenticate with Elasticsearch. "
      + "The default is the empty string, and authentication will only be performed if "
      + " both the username and password are non-empty.";
      + "The default is the null, and authentication will only be performed if "
      + " both the username and password are non-null.";
  public static final String CONNECTION_PASSWORD_CONFIG = "connection.password";
  private static final String CONNECTION_PASSWORD_DOC =
      "The password used to authenticate with Elasticsearch. "
      + "The default is the empty string, and authentication will only be performed if "
      + " both the username and password are non-empty.";
      + "The default is the null, and authentication will only be performed if "
      + " both the username and password are non-null.";
  public static final String BATCH_SIZE_CONFIG = "batch.size";
  private static final String BATCH_SIZE_DOC =
      "The number of records to process as a batch when writing to Elasticsearch.";
@@ -171,7 +171,7 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
    ).define(
        CONNECTION_USERNAME_CONFIG,
        Type.STRING,
        "",
        null,
        Importance.MEDIUM,
        CONNECTION_USERNAME_DOC,
        group,
@@ -181,7 +181,7 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
    ).define(
        CONNECTION_PASSWORD_CONFIG,
        Type.STRING,
        "",
        null,
        Importance.MEDIUM,
        CONNECTION_PASSWORD_DOC,
        group,
+1 −1
Original line number Diff line number Diff line
@@ -133,7 +133,7 @@ public class JestElasticsearchClient implements ElasticsearchClient {
          .connTimeout(connTimeout)
          .readTimeout(readTimeout)
          .multiThreaded(true);
      if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
      if (username != null && password != null) {
        builder.defaultCredentials(username, password)
            .preemptiveAuthTargetHosts(address.stream()
                .map(addr -> HttpHost.create(addr)).collect(Collectors.toSet()));
+20 −0
Original line number Diff line number Diff line
@@ -117,6 +117,26 @@ public class JestElasticsearchClientTest {
    assertEquals(HttpHost.create("http://localhost:9200"), preemptiveAuthTargetHosts.iterator().next());
  }

  @Test
  public void connectsSecurelyWithEmptyUsernameAndPassword() {
    Map<String, String> props = new HashMap<>();
    props.put(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost:9200");
    props.put(ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG, "");
    props.put(ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG, "");
    props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, "kafka-connect");
    JestElasticsearchClient client = new JestElasticsearchClient(props, jestClientFactory);

    ArgumentCaptor<HttpClientConfig> captor = ArgumentCaptor.forClass(HttpClientConfig.class);
    verify(jestClientFactory).setHttpClientConfig(captor.capture());
    HttpClientConfig httpClientConfig = captor.getValue();
    CredentialsProvider credentialsProvider = httpClientConfig.getCredentialsProvider();
    Credentials credentials = credentialsProvider.getCredentials(AuthScope.ANY);
    Set<HttpHost> preemptiveAuthTargetHosts = httpClientConfig.getPreemptiveAuthTargetHosts();
    assertEquals("", credentials.getUserPrincipal().getName());
    assertEquals("", credentials.getPassword());
    assertEquals(HttpHost.create("http://localhost:9200"), preemptiveAuthTargetHosts.iterator().next());
  }

  @Test
  public void getsVersion() {
    JestElasticsearchClient client = new JestElasticsearchClient(jestClient);