Commit bc24949e authored by Robert Yokota's avatar Robert Yokota
Browse files

Add unit tests for Jest client; more cleanup from code review

parent 9446aa4f
Loading
Loading
Loading
Loading
+7 −34
Original line number Diff line number Diff line
@@ -39,15 +39,10 @@
    </scm>

    <properties>
        <!--
        <es.version>6.0.0</es.version>
        <lucene.version>7.0.1</lucene.version>
        <es.version>5.0.0</es.version>
        <lucene.version>6.2.0</lucene.version>
        -->
        <es.version>2.4.1</es.version>
        <lucene.version>5.5.2</lucene.version>
        <hamcrest.version>1.3</hamcrest.version>
        <mockito.version>2.13.0</mockito.version>
        <jest.version>2.4.0</jest.version>
        <confluent.maven.repo>http://packages.confluent.io/maven/</confluent.maven.repo>
    </properties>
@@ -82,6 +77,12 @@
            <version>${hamcrest.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-core</artifactId>
            <version>${mockito.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
@@ -100,7 +101,6 @@
            <version>${lucene.version}</version>
            <scope>test</scope>
        </dependency>
        <!-- For ES 2.x -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
@@ -108,33 +108,6 @@
            <scope>test</scope>
            <type>test-jar</type>
        </dependency>
        <!-- For ES 5.x (requires Java 8) -->
        <!--
        <dependency>
            <groupId>org.elasticsearch.test</groupId>
            <artifactId>framework</artifactId>
            <version>${es.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.plugin</groupId>
            <artifactId>transport-netty4-client</artifactId>
            <version>${es.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.7</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.7</version>
            <scope>test</scope>
        </dependency>
        -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
+1 −0
Original line number Diff line number Diff line
@@ -30,5 +30,6 @@ public class ElasticsearchSinkConnectorConstants {
  public static final String DOUBLE_TYPE = "double";
  public static final String STRING_TYPE = "string";
  public static final String TEXT_TYPE = "text";
  public static final String KEYWORD_TYPE = "keyword";
  public static final String DATE_TYPE = "date";
}
+1 −4
Original line number Diff line number Diff line
@@ -24,7 +24,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

import java.util.Collections;
import java.util.Collection;
import java.util.HashMap;
@@ -314,9 +313,7 @@ public class ElasticsearchWriter {
  }

  public void createIndicesForTopics(Set<String> assignedTopics) {
    if (assignedTopics == null) {
      throw new NullPointerException();
    }
    Objects.requireNonNull(assignedTopics);
    client.createIndices(indicesForTopics(assignedTopics));
  }

+10 −6
Original line number Diff line number Diff line
@@ -49,16 +49,19 @@ public class Mapping {

  /**
   * Create an explicit mapping.
   *
   * @param client The client to connect to Elasticsearch.
   * @param index  The index to write to Elasticsearch.
   * @param type   The type to create mapping for.
   * @param schema The schema used to infer mapping.
   * @throws IOException from underlying JestClient
   */
  public static void createMapping(ElasticsearchClient client,
  public static void createMapping(
      ElasticsearchClient client,
      String index,
      String type,
                                   Schema schema)
      Schema schema
  )
      throws IOException {
    client.createMapping(index, type, schema);
  }
@@ -73,6 +76,7 @@ public class Mapping {

  /**
   * Infer mapping from the provided schema.
   *
   * @param schema The schema used to infer mapping.
   */
  public static JsonNode inferMapping(ElasticsearchClient client, Schema schema) {
+22 −1
Original line number Diff line number Diff line
@@ -57,6 +57,13 @@ import java.util.Map;
import java.util.Set;

public class JestElasticsearchClient implements ElasticsearchClient {

  // visible for testing
  protected static final String MAPPER_PARSE_EXCEPTION
      = "mapper_parse_exception";
  protected static final String VERSION_CONFLICT_ENGINE_EXCEPTION
      = "version_conflict_engine_exception";

  private static final Logger LOG = LoggerFactory.getLogger(JestElasticsearchClient.class);

  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -64,6 +71,19 @@ public class JestElasticsearchClient implements ElasticsearchClient {
  private final JestClient client;
  private final Version version;

  // visible for testing
  public JestElasticsearchClient(JestClient client) {
    try {
      this.client = client;
      this.version = getServerVersion();
    } catch (IOException e) {
      throw new ConnectException(
          "Couldn't start ElasticsearchSinkTask due to connection error:",
          e
      );
    }
  }

  // visible for testing
  public JestElasticsearchClient(String address) {
    try {
@@ -234,7 +254,8 @@ public class JestElasticsearchClient implements ElasticsearchClient {
    return new JestBulkRequest(builder.build());
  }

  private BulkableAction toBulkableAction(IndexableRecord record) {
  // visible for testing
  protected BulkableAction toBulkableAction(IndexableRecord record) {
    // If payload is null, the record was a tombstone and we should delete from the index.
    return record.payload != null ? toIndexRequest(record) : toDeleteRequest(record);
  }
Loading