Commit 361bd4d2 authored by Robert Yokota's avatar Robert Yokota
Browse files

Merge branch '4.0.x'

parents 7de257e5 0ff9c21a
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -15,7 +15,7 @@
    <!-- TODO: Undecided if this is too much -->
    <suppress
            checks="(ClassDataAbstractionCoupling)"
            files="(BulkProcessor).java"
            files="(BulkProcessor|JestElasticsearchClient).java"
    />

    <!-- TODO: Pass some parameters in common config object? -->
+9 −9
Original line number Diff line number Diff line
@@ -41,9 +41,9 @@
    <properties>
        <es.version>2.4.1</es.version>
        <lucene.version>5.5.2</lucene.version>
        <jna.version>4.2.1</jna.version>
        <hamcrest.version>2.0.0.0</hamcrest.version>
        <jest.version>2.0.0</jest.version>
        <hamcrest.version>1.3</hamcrest.version>
        <mockito.version>2.13.0</mockito.version>
        <jest.version>2.4.0</jest.version>
        <confluent.maven.repo>http://packages.confluent.io/maven/</confluent.maven.repo>
    </properties>

@@ -72,15 +72,15 @@
            <version>${jest.version}</version>
        </dependency>
        <dependency>
            <groupId>net.java.dev.jna</groupId>
            <artifactId>jna</artifactId>
            <version>${jna.version}</version>
            <groupId>org.hamcrest</groupId>
            <artifactId>hamcrest-all</artifactId>
            <version>${hamcrest.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.hamcrest</groupId>
            <artifactId>hamcrest-junit</artifactId>
            <version>${hamcrest.version}</version>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-core</artifactId>
            <version>${mockito.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
+10 −63
Original line number Diff line number Diff line
@@ -16,82 +16,29 @@

package io.confluent.connect.elasticsearch;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import io.confluent.connect.elasticsearch.bulk.BulkClient;
import io.confluent.connect.elasticsearch.bulk.BulkRequest;
import io.confluent.connect.elasticsearch.bulk.BulkResponse;
import io.searchbox.client.JestClient;
import io.searchbox.core.Bulk;
import io.searchbox.core.BulkResult;

public class BulkIndexingClient implements BulkClient<IndexableRecord, Bulk> {

  private static final Logger LOG = LoggerFactory.getLogger(BulkIndexingClient.class);
import java.io.IOException;
import java.util.List;

  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public class BulkIndexingClient implements BulkClient<IndexableRecord, BulkRequest> {

  private final JestClient client;
  private final ElasticsearchClient client;

  public BulkIndexingClient(JestClient client) {
  public BulkIndexingClient(ElasticsearchClient client) {
    this.client = client;
  }

  @Override
  public Bulk bulkRequest(List<IndexableRecord> batch) {
    final Bulk.Builder builder = new Bulk.Builder();
    for (IndexableRecord record : batch) {
      builder.addAction(record.toBulkableAction());
    }
    return builder.build();
  public BulkRequest bulkRequest(List<IndexableRecord> batch) {
    return client.createBulkRequest(batch);
  }

  @Override
  public BulkResponse execute(Bulk bulk) throws IOException {
    final BulkResult result = client.execute(bulk);

    if (result.isSucceeded()) {
      return BulkResponse.success();
    }

    boolean retriable = true;

    final List<Key> versionConflicts = new ArrayList<>();
    final List<String> errors = new ArrayList<>();

    for (BulkResult.BulkResultItem item : result.getItems()) {
      if (item.error != null) {
        final ObjectNode parsedError = (ObjectNode) OBJECT_MAPPER.readTree(item.error);
        final String errorType = parsedError.get("type").asText("");
        if ("version_conflict_engine_exception".equals(errorType)) {
          versionConflicts.add(new Key(item.index, item.type, item.id));
        } else if ("mapper_parse_exception".equals(errorType)) {
          retriable = false;
          errors.add(item.error);
        } else {
          errors.add(item.error);
        }
      }
    }

    if (!versionConflicts.isEmpty()) {
      LOG.debug("Ignoring version conflicts for items: {}", versionConflicts);
      if (errors.isEmpty()) {
        // The only errors were version conflicts
        return BulkResponse.success();
      }
    }

    final String errorInfo = errors.isEmpty() ? result.getErrorMessage() : errors.toString();

    return BulkResponse.failure(retriable, errorInfo);
  public BulkResponse execute(BulkRequest bulk) throws IOException {
    return client.executeBulk(bulk);
  }

}
+99 −0
Original line number Diff line number Diff line
/**
 * Copyright 2018 Confluent Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 **/

package io.confluent.connect.elasticsearch;

import com.google.gson.JsonObject;
import io.confluent.connect.elasticsearch.bulk.BulkRequest;
import io.confluent.connect.elasticsearch.bulk.BulkResponse;
import org.apache.kafka.connect.data.Schema;

import java.io.IOException;
import java.util.List;
import java.util.Set;

public interface ElasticsearchClient extends AutoCloseable {

  enum Version {
    ES_V1, ES_V2, ES_V5, ES_V6
  }

  /**
   * Gets the Elasticsearch version.
   *
   * @return the version, not null
   */
  Version getVersion();

  /**
   * Creates indices.
   *
   * @param indices the set of index names to create, not null
   */
  void createIndices(Set<String> indices);

  /**
   * Creates an explicit mapping.
   *
   * @param index the index to write
   * @param type the type for which to create the mapping
   * @param schema the schema used to infer the mapping
   * @throws IOException if the client cannot execute the request
   */
  void createMapping(String index, String type, Schema schema) throws IOException;

  /**
   * Gets the JSON mapping for the given index and type. Returns {@code null} if it does not exist.
   *
   * @param index the index
   * @param type the type
   * @throws IOException if the client cannot execute the request
   */
  JsonObject getMapping(String index, String type) throws IOException;

  /**
   * Creates a bulk request for the list of {@link IndexableRecord} records.
   *
   * @param batch the list of records
   * @return the bulk request
   */
  BulkRequest createBulkRequest(List<IndexableRecord> batch);

  /**
   * Executes a bulk action.
   *
   * @param bulk the bulk request
   * @return the bulk response
   * @throws IOException if the client cannot execute the request
   */
  BulkResponse executeBulk(BulkRequest bulk) throws IOException;

  /**
   * Executes a search.
   *
   * @param query the search query
   * @param index the index to search
   * @param type the type to search
   * @return the search result
   * @throws IOException if the client cannot execute the request
   */
  JsonObject search(String query, String index, String type) throws IOException;

  /**
   * Shuts down the client.
   */
  void close();
}
+2 −19
Original line number Diff line number Diff line
@@ -16,11 +16,6 @@

package io.confluent.connect.elasticsearch;

import org.apache.kafka.connect.data.Schema.Type;

import java.util.HashMap;
import java.util.Map;

public class ElasticsearchSinkConnectorConstants {
  public static final String MAP_KEY = "key";
  public static final String MAP_VALUE = "value";
@@ -34,19 +29,7 @@ public class ElasticsearchSinkConnectorConstants {
  public static final String FLOAT_TYPE = "float";
  public static final String DOUBLE_TYPE = "double";
  public static final String STRING_TYPE = "string";
  public static final String TEXT_TYPE = "text";
  public static final String KEYWORD_TYPE = "keyword";
  public static final String DATE_TYPE = "date";

  static final Map<Type, String> TYPES = new HashMap<>();

  static {
    TYPES.put(Type.BOOLEAN, BOOLEAN_TYPE);
    TYPES.put(Type.INT8, BYTE_TYPE);
    TYPES.put(Type.INT16, SHORT_TYPE);
    TYPES.put(Type.INT32, INTEGER_TYPE);
    TYPES.put(Type.INT64, LONG_TYPE);
    TYPES.put(Type.FLOAT32, FLOAT_TYPE);
    TYPES.put(Type.FLOAT64, DOUBLE_TYPE);
    TYPES.put(Type.STRING, STRING_TYPE);
    TYPES.put(Type.BYTES, BINARY_TYPE);
  }
}
Loading