Commit 9446aa4f authored by Robert Yokota's avatar Robert Yokota
Browse files

Incorporate feedback from code review

parent e064acd8
Loading
Loading
Loading
Loading
+12 −12
Original line number Diff line number Diff line
@@ -100,18 +100,6 @@
            <version>${lucene.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.7</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.7</version>
            <scope>test</scope>
        </dependency>
        <!-- For ES 2.x -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
@@ -134,6 +122,18 @@
            <version>${es.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.7</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.7</version>
            <scope>test</scope>
        </dependency>
        -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
+9 −7
Original line number Diff line number Diff line
@@ -25,23 +25,23 @@ import java.io.IOException;
import java.util.List;
import java.util.Set;

public interface ElasticsearchClient {
public interface ElasticsearchClient extends AutoCloseable {

  enum Version {
    ONE, TWO, FIVE, SIX
    ES_V1, ES_V2, ES_V5, ES_V6
  }

  /**
   * Gets the Elasticsearch version.
   *
   * @return the version
   * @return the version, not null
   */
  Version getVersion();

  /**
   * Creates indices.
   *
   * @param indices the set of index names to create
   * @param indices the set of index names to create, not null
   */
  void createIndices(Set<String> indices);

@@ -51,7 +51,7 @@ public interface ElasticsearchClient {
   * @param index the index to write
   * @param type the type for which to create the mapping
   * @param schema the schema used to infer the mapping
   * @throws IOException from underlying client
   * @throws IOException if the client cannot execute the request
   */
  void createMapping(String index, String type, Schema schema) throws IOException;

@@ -60,7 +60,7 @@ public interface ElasticsearchClient {
   *
   * @param index the index
   * @param type the type
   * @throws IOException from underlying client
   * @throws IOException if the client cannot execute the request
   */
  JsonObject getMapping(String index, String type) throws IOException;

@@ -77,6 +77,7 @@ public interface ElasticsearchClient {
   *
   * @param bulk the bulk request
   * @return the bulk response
   * @throws IOException if the client cannot execute the request
   */
  BulkResponse executeBulk(BulkRequest bulk) throws IOException;

@@ -87,11 +88,12 @@ public interface ElasticsearchClient {
   * @param index the index to search
   * @param type the type to search
   * @return the search result
   * @throws IOException if the client cannot execute the request
   */
  JsonObject search(String query, String index, String type) throws IOException;

  /**
   * Shuts down the client.
   */
  void shutdown();
  void close();
}
+18 −18
Original line number Diff line number Diff line
@@ -56,47 +56,47 @@ public class ElasticsearchSinkTask extends SinkTask {
      log.info("Starting ElasticsearchSinkTask.");

      ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
      final String type = config.getString(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG);
      final boolean ignoreKey =
      String type = config.getString(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG);
      boolean ignoreKey =
          config.getBoolean(ElasticsearchSinkConnectorConfig.KEY_IGNORE_CONFIG);
      final boolean ignoreSchema =
      boolean ignoreSchema =
          config.getBoolean(ElasticsearchSinkConnectorConfig.SCHEMA_IGNORE_CONFIG);
      final boolean useCompactMapEntries =
      boolean useCompactMapEntries =
          config.getBoolean(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG);


      final Map<String, String> topicToIndexMap =
      Map<String, String> topicToIndexMap =
          parseMapConfig(config.getList(ElasticsearchSinkConnectorConfig.TOPIC_INDEX_MAP_CONFIG));
      final Set<String> topicIgnoreKey =
      Set<String> topicIgnoreKey =
          new HashSet<>(config.getList(ElasticsearchSinkConnectorConfig.TOPIC_KEY_IGNORE_CONFIG));
      final Set<String> topicIgnoreSchema = new HashSet<>(
      Set<String> topicIgnoreSchema = new HashSet<>(
          config.getList(ElasticsearchSinkConnectorConfig.TOPIC_SCHEMA_IGNORE_CONFIG)
      );

      final long flushTimeoutMs =
      long flushTimeoutMs =
          config.getLong(ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG);
      final int maxBufferedRecords =
      int maxBufferedRecords =
          config.getInt(ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG);
      final int batchSize =
      int batchSize =
          config.getInt(ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG);
      final long lingerMs =
      long lingerMs =
          config.getLong(ElasticsearchSinkConnectorConfig.LINGER_MS_CONFIG);
      final int maxInFlightRequests =
      int maxInFlightRequests =
          config.getInt(ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG);
      final long retryBackoffMs =
      long retryBackoffMs =
          config.getLong(ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG);
      final int maxRetry =
      int maxRetry =
          config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG);
      final boolean dropInvalidMessage =
      boolean dropInvalidMessage =
          config.getBoolean(ElasticsearchSinkConnectorConfig.DROP_INVALID_MESSAGE_CONFIG);

      final DataConverter.BehaviorOnNullValues behaviorOnNullValues =
      DataConverter.BehaviorOnNullValues behaviorOnNullValues =
          DataConverter.BehaviorOnNullValues.forValue(
              config.getString(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG)
          );

      // Calculate the maximum possible backoff time ...
      final long maxRetryBackoffMs =
      long maxRetryBackoffMs =
          RetryUtil.computeRetryWaitTimeInMillis(maxRetry, retryBackoffMs);
      if (maxRetryBackoffMs > RetryUtil.MAX_RETRY_TIME_MS) {
        log.warn("This connector uses exponential backoff with jitter for retries, "
@@ -173,7 +173,7 @@ public class ElasticsearchSinkTask extends SinkTask {
      writer.stop();
    }
    if (client != null) {
      client.shutdown();
      client.close();
    }
  }

+3 −0
Original line number Diff line number Diff line
@@ -314,6 +314,9 @@ public class ElasticsearchWriter {
  }

  public void createIndicesForTopics(Set<String> assignedTopics) {
    if (assignedTopics == null) {
      throw new NullPointerException();
    }
    client.createIndices(indicesForTopics(assignedTopics));
  }

+9 −10
Original line number Diff line number Diff line
/**
 * Copyright 2016 Confluent Inc.
 *
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -19,7 +19,6 @@ package io.confluent.connect.elasticsearch;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.JsonObject;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
@@ -110,8 +109,8 @@ public class Mapping {
    }
  }

  @VisibleForTesting
  public static String getElasticsearchType(ElasticsearchClient client, Schema.Type schemaType) {
  // visible for testing
  protected static String getElasticsearchType(ElasticsearchClient client, Schema.Type schemaType) {
    switch (schemaType) {
      case BOOLEAN:
        return BOOLEAN_TYPE;
@@ -129,11 +128,11 @@ public class Mapping {
        return DOUBLE_TYPE;
      case STRING:
        switch (client.getVersion()) {
          case ONE:
          case TWO:
          case ES_V1:
          case ES_V2:
            return STRING_TYPE;
          case FIVE:
          case SIX:
          case ES_V5:
          case ES_V6:
          default:
            return TEXT_TYPE;
        }
Loading