Commit ebf12ba9 authored by Ewen Cheslack-Postava's avatar Ewen Cheslack-Postava
Browse files

Merge remote-tracking branch 'origin/3.4.x'

parents 060d56de c97d3ba0
Loading
Loading
Loading
Loading

checkstyle/checkstyle.xml

deleted100644 → 0
+0 −84
Original line number Diff line number Diff line
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE module PUBLIC
    "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
     "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
<!--
 // Copyright 2015 Confluent Inc.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // You may obtain a copy of the License at
 //
 // http://www.apache.org/licenses/LICENSE-2.0
 //
 // Unless required by applicable law or agreed to in writing, software
 // distributed under the License is distributed on an "AS IS" BASIS,
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
-->
<module name="Checker">
  <property name="localeLanguage" value="en"/>

  <module name="FileTabCharacter"/>

  <!-- header -->
  <module name="RegexpHeader">
    <property name="header" value="/\*\*\nCopyright .* Confluent Inc."/>
  </module>

  <module name="TreeWalker">

    <!-- code cleanup -->
    <module name="UnusedImports"/>
    <module name="RedundantImport"/>
    <module name="IllegalImport" />
    <module name="EqualsHashCode"/>
    <module name="SimplifyBooleanExpression"/>
    <module name="OneStatementPerLine"/>
    <module name="SimplifyBooleanReturn"/>

    <!-- style -->
    <module name="DefaultComesLast"/>
    <module name="EmptyStatement"/>
    <module name="ArrayTypeStyle"/>
    <module name="UpperEll"/>
    <module name="LeftCurly"/>
    <module name="RightCurly"/>
    <module name="EmptyStatement"/>
    <module name="ConstantName">
      <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/>
    </module>
    <module name="LocalVariableName"/>
    <module name="LocalFinalVariableName"/>
    <module name="MemberName"/>
    <module name="ClassTypeParameterName">
      <property name="format" value="^[A-Z0-9]*$"/>
    </module>
    <module name="MethodTypeParameterName">
      <property name="format" value="^[A-Z0-9]*$"/>
    </module>
    <module name="PackageName"/>
    <module name="ParameterName"/>
    <module name="StaticVariableName"/>
    <module name="TypeName"/>
    <module name="AvoidStarImport"/>

    <!-- whitespace -->
    <module name="GenericWhitespace"/>
    <module name="NoWhitespaceBefore"/>
    <module name="WhitespaceAfter" />
    <module name="NoWhitespaceAfter"/>
    <module name="WhitespaceAround">
      <property name="allowEmptyConstructors" value="true"/>
      <property name="allowEmptyMethods" value="true"/>
    </module>
    <module name="Indentation">
      <property name="basicOffset" value="2"/>
      <property name="caseIndent" value="2"/>
    </module>
    <module name="MethodParamPad"/>
    <module name="ParenPad"/>
    <module name="TypecastParenPad"/>
  </module>
</module>
+27 −0
Original line number Diff line number Diff line
<?xml version="1.0"?>

<!DOCTYPE suppressions PUBLIC
        "-//Puppy Crawl//DTD Suppressions 1.1//EN"
        "http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">

<suppressions>

    <!-- switch statements on types exceed maximum complexity -->
    <suppress
            checks="(CyclomaticComplexity)"
            files="(Mapping).java"
    />

    <!-- TODO: Undecided if this is too much -->
    <suppress
            checks="(ClassDataAbstractionCoupling)"
            files="(BulkProcessor).java"
    />

    <!-- TODO: Pass some parameters in common config object? -->
    <suppress
            checks="(ParameterNumber)"
            files="(ElasticsearchWriter).java"
    />

</suppressions>
+37 −33
Original line number Diff line number Diff line
@@ -4,21 +4,46 @@
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>io.confluent</groupId>
    <artifactId>kafka-connect-elasticsearch</artifactId>
        <artifactId>common</artifactId>
        <version>3.5.0-SNAPSHOT</version>
    </parent>

    <groupId>io.confluent</groupId>
    <artifactId>kafka-connect-elasticsearch</artifactId>
    <packaging>jar</packaging>
    <name>kafka-connect-elasticsearch</name>
    <organization>
        <name>Confluent, Inc.</name>
        <url>http://confluent.io</url>
    </organization>
    <url>http://confluent.io</url>
    <description>
        Elasticsearch Sink Connector for Kafka Connect
    </description>

    <licenses>
        <license>
            <name>Apache License 2.0</name>
            <url>http://www.apache.org/licenses/LICENSE-2.0.html</url>
            <distribution>repo</distribution>
        </license>
    </licenses>

    <scm>
        <connection>scm:git:git://github.com/confluentinc/kafka-connect-elasticsearch.git</connection>
        <developerConnection>scm:git:git@github.com:confluentinc/kafka-connect-elasticsearch.git</developerConnection>
        <url>https://github.com/confluentinc/kafka-connect-elasticsearch</url>
        <tag>HEAD</tag>
    </scm>

    <properties>
        <confluent.version>3.5.0-SNAPSHOT</confluent.version>
        <kafka.version>0.11.1.0-SNAPSHOT</kafka.version>
        <junit.version>4.12</junit.version>
        <es.version>2.4.1</es.version>
        <lucene.version>5.5.2</lucene.version>
        <slf4j.version>1.7.5</slf4j.version>
        <jna.version>4.2.1</jna.version>
        <hamcrest.version>2.0.0.0</hamcrest.version>
        <jest.version>2.0.0</jest.version>
        <licenses.version>3.5.0-SNAPSHOT</licenses.version>
        <confluent.maven.repo>http://packages.confluent.io/maven/</confluent.maven.repo>
    </properties>

@@ -34,13 +59,11 @@
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>${kafka.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-json</artifactId>
            <version>${kafka.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
@@ -48,12 +71,6 @@
            <artifactId>jest</artifactId>
            <version>${jest.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>${slf4j.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>net.java.dev.jna</groupId>
            <artifactId>jna</artifactId>
@@ -110,16 +127,11 @@
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.5.1</version>
                <inherited>true</inherited>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.5.3</version>
                <configuration>
                    <descriptors>
                        <descriptor>src/assembly/development.xml</descriptor>
@@ -139,27 +151,19 @@
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <argLine>-Djava.awt.headless=true</argLine>
                    <argLine>-Dtests.security.manager=false</argLine>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-checkstyle-plugin</artifactId>
                <version>2.17</version>
                <executions>
                    <execution>
                        <id>validate</id>
                        <phase>validate</phase>
                        <configuration>
                            <configLocation>checkstyle/checkstyle.xml</configLocation>
                            <encoding>UTF-8</encoding>
                            <consoleOutput>true</consoleOutput>
                            <failsOnError>true</failsOnError>
                            <includeResources>false</includeResources>
                            <includeTestResources>false</includeTestResources>
                            <suppressionsLocation>checkstyle/suppressions.xml</suppressionsLocation>
                        </configuration>
                        <goals>
                            <goal>check</goal>
@@ -215,7 +219,7 @@
                                        <argument>-l ${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-elasticsearch/licenses</argument>
                                        <argument>-n ${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-elasticsearch/notices</argument>
                                        <argument>-t ${project.name}</argument>
                                        <argument>-x licenses-${licenses.version}.jar</argument>
                                        <argument>-x licenses-${project.version}.jar</argument>
                                    </arguments>
                                </configuration>
                                <phase>package</phase>
@@ -236,7 +240,7 @@
                            <dependency>
                                <groupId>io.confluent</groupId>
                                <artifactId>licenses</artifactId>
                                <version>${licenses.version}</version>
                                <version>${project.version}</version>
                            </dependency>
                        </dependencies>
                    </plugin>
@@ -265,7 +269,7 @@
                                        <argument>-l ${project.basedir}/licenses</argument>
                                        <argument>-n ${project.basedir}/notices</argument>
                                        <argument>-t ${project.name}</argument>
                                        <argument>-x licenses-${licenses.version}.jar</argument>
                                        <argument>-x licenses-${project.version}.jar</argument>
                                    </arguments>
                                </configuration>
                                <phase>package</phase>
@@ -286,7 +290,7 @@
                            <dependency>
                                <groupId>io.confluent</groupId>
                                <artifactId>licenses</artifactId>
                                <version>${licenses.version}</version>
                                <version>${project.version}</version>
                            </dependency>
                        </dependencies>
                    </plugin>
+1 −0
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@
 * License for the specific language governing permissions and limitations under
 * the License.
 **/

package io.confluent.connect.elasticsearch;

import com.fasterxml.jackson.databind.ObjectMapper;
+146 −81
Original line number Diff line number Diff line
@@ -46,6 +46,7 @@ import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConst
public class DataConverter {

  private static final Converter JSON_CONVERTER;

  static {
    JSON_CONVERTER = new JsonConverter();
    JSON_CONVERTER.configure(Collections.singletonMap("schemas.enable", "false"), false);
@@ -60,7 +61,11 @@ public class DataConverter {
    if (keySchema == null) {
      schemaType = ConnectSchema.schemaType(key.getClass());
      if (schemaType == null) {
        throw new DataException("Java class " + key.getClass() + " does not have corresponding schema type.");
        throw new DataException(
            "Java class "
            + key.getClass()
            + " does not have corresponding schema type."
        );
      }
    } else {
      schemaType = keySchema.type();
@@ -78,10 +83,18 @@ public class DataConverter {
    }
  }

  public static IndexableRecord convertRecord(SinkRecord record, String index, String type, boolean ignoreKey, boolean ignoreSchema) {
  public static IndexableRecord convertRecord(
      SinkRecord record,
      String index,
      String type,
      boolean ignoreKey,
      boolean ignoreSchema
  ) {
    final String id;
    if (ignoreKey) {
      id = record.topic() + "+" + String.valueOf((int) record.kafkaPartition()) + "+" + String.valueOf(record.kafkaOffset());
      id = record.topic()
           + "+" + String.valueOf((int) record.kafkaPartition())
           + "+" + String.valueOf(record.kafkaOffset());
    } else {
      id = DataConverter.convertKey(record.keySchema(), record.key());
    }
@@ -96,15 +109,17 @@ public class DataConverter {
      value = record.value();
    }

    final String payload = new String(JSON_CONVERTER.fromConnectData(record.topic(), schema, value), StandardCharsets.UTF_8);
    byte[] rawJsonPayload = JSON_CONVERTER.fromConnectData(record.topic(), schema, value);
    final String payload = new String(rawJsonPayload, StandardCharsets.UTF_8);
    final Long version = ignoreKey ? null : record.kafkaOffset();
    return new IndexableRecord(new Key(index, type, id), payload, version);
  }

  // We need to pre process the Kafka Connect schema before converting to JSON as Elasticsearch
  // expects a different JSON format from the current JSON converter provides. Rather than completely
  // rewrite a converter for Elasticsearch, we will refactor the JSON converter to support customized
  // translation. The pre process is no longer needed once we have the JSON converter refactored.
  // expects a different JSON format from the current JSON converter provides. Rather than
  // completely rewrite a converter for Elasticsearch, we will refactor the JSON converter to
  // support customized translation. The pre process is no longer needed once we have the JSON
  // converter refactored.
  static Schema preProcessSchema(Schema schema) {
    if (schema == null) {
      return null;
@@ -119,39 +134,53 @@ public class DataConverter {
        case Time.LOGICAL_NAME:
        case Timestamp.LOGICAL_NAME:
          return schema;
        default:
          // User type or unknown logical type
          break;
      }
    }

    Schema.Type schemaType = schema.type();
    switch (schemaType) {
      case ARRAY: {
        return copySchemaBasics(schema, SchemaBuilder.array(preProcessSchema(schema.valueSchema()))).build();
      case ARRAY:
        return preProcessArraySchema(schema);
      case MAP:
        return preProcessMapSchema(schema);
      case STRUCT:
        return preProcessStructSchema(schema);
      default:
        return schema;
    }
      case MAP: {
  }

  private static Schema preProcessArraySchema(Schema schema) {
    Schema valSchema = preProcessSchema(schema.valueSchema());
    return copySchemaBasics(schema, SchemaBuilder.array(valSchema)).build();
  }

  private static Schema preProcessMapSchema(Schema schema) {
    Schema keySchema = schema.keySchema();
    Schema valueSchema = schema.valueSchema();
    String keyName = keySchema.name() == null ? keySchema.type().name() : keySchema.name();
    String valueName = valueSchema.name() == null ? valueSchema.type().name() : valueSchema.name();
    Schema preprocessedKeySchema = preProcessSchema(keySchema);
    Schema preprocessedValueSchema = preProcessSchema(valueSchema);
    if (keySchema.type() == Schema.Type.STRING) {
          return SchemaBuilder.map(preProcessSchema(keySchema), preProcessSchema(valueSchema)).build();
      return SchemaBuilder.map(preprocessedKeySchema, preprocessedValueSchema).build();
    }
    Schema elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName)
             .field(MAP_KEY, preProcessSchema(keySchema))
             .field(MAP_VALUE, preProcessSchema(valueSchema))
        .field(MAP_KEY, preprocessedKeySchema)
        .field(MAP_VALUE, preprocessedValueSchema)
        .build();
    return copySchemaBasics(schema, SchemaBuilder.array(elementSchema)).build();
  }
      case STRUCT: {
        SchemaBuilder structBuilder = copySchemaBasics(schema, SchemaBuilder.struct().name(schemaName));

  private static Schema preProcessStructSchema(Schema schema) {
    SchemaBuilder builder = copySchemaBasics(schema, SchemaBuilder.struct().name(schema.name()));
    for (Field field : schema.fields()) {
          structBuilder.field(field.name(), preProcessSchema(field.schema()));
        }
        return structBuilder.build();
      }
      default: {
        return schema;
      }
      builder.field(field.name(), preProcessSchema(field.schema()));
    }
    return builder.build();
  }

  private static SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder target) {
@@ -159,18 +188,48 @@ public class DataConverter {
      target.optional();
    }
    if (source.defaultValue() != null && source.type() != Schema.Type.STRUCT) {
      final Object preProcessedDefaultValue = preProcessValue(source.defaultValue(), source, target);
      target.defaultValue(preProcessedDefaultValue);
      final Object defaultVal = preProcessValue(source.defaultValue(), source, target);
      target.defaultValue(defaultVal);
    }
    return target;
  }

  // visible for testing
  static Object preProcessValue(Object value, Schema schema, Schema newSchema) {
    // Handle missing schemas and acceptable null values
    if (schema == null) {
      return value;
    }
    if (value == null) {
      Object result = preProcessNullValue(schema);
      if (result != null) {
        return result;
      }
    }

    // Handle logical types
    String schemaName = schema.name();
    if (schemaName != null) {
      Object result = preProcessLogicalValue(schemaName, value);
      if (result != null) {
        return result;
      }
    }

    Schema.Type schemaType = schema.type();
    switch (schemaType) {
      case ARRAY:
        return preProcessArrayValue(value, schema, newSchema);
      case MAP:
        return preProcessMapValue(value, schema, newSchema);
      case STRUCT:
        return preProcessStructValue(value, schema, newSchema);
      default:
        return value;
    }
  }

  private static Object preProcessNullValue(Schema schema) {
    if (schema.defaultValue() != null) {
      return schema.defaultValue();
    }
@@ -180,9 +239,8 @@ public class DataConverter {
    throw new DataException("null value for field that is required and has no default value");
  }

    // Handle logical types
    String schemaName = schema.name();
    if (schemaName != null) {
  // @returns the decoded logical value or null if this isn't a known logical type
  private static Object preProcessLogicalValue(String schemaName, Object value) {
    switch (schemaName) {
      case Decimal.LOGICAL_NAME:
        return ((BigDecimal) value).doubleValue();
@@ -190,19 +248,22 @@ public class DataConverter {
      case Time.LOGICAL_NAME:
      case Timestamp.LOGICAL_NAME:
        return value;
      default:
        // User-defined type or unknown built-in
        return null;
    }
  }

    Schema.Type schemaType = schema.type();
    switch (schemaType) {
      case ARRAY:
  private static Object preProcessArrayValue(Object value, Schema schema, Schema newSchema) {
    Collection collection = (Collection) value;
    List<Object> result = new ArrayList<>();
    for (Object element: collection) {
      result.add(preProcessValue(element, schema.valueSchema(), newSchema.valueSchema()));
    }
    return result;
      case MAP:
  }

  private static Object preProcessMapValue(Object value, Schema schema, Schema newSchema) {
    Schema keySchema = schema.keySchema();
    Schema valueSchema = schema.valueSchema();
    Schema newValueSchema = newSchema.valueSchema();
@@ -210,29 +271,33 @@ public class DataConverter {
    if (keySchema.type() == Schema.Type.STRING) {
      Map<Object, Object> processedMap = new HashMap<>();
      for (Map.Entry<?, ?> entry: map.entrySet()) {
            processedMap.put(preProcessValue(entry.getKey(), keySchema, newSchema.keySchema()),
                preProcessValue(entry.getValue(), valueSchema, newValueSchema));
        processedMap.put(
            preProcessValue(entry.getKey(), keySchema, newSchema.keySchema()),
            preProcessValue(entry.getValue(), valueSchema, newValueSchema)
        );
      }
      return processedMap;
    }
    List<Struct> mapStructs = new ArrayList<>();
    for (Map.Entry<?, ?> entry: map.entrySet()) {
      Struct mapStruct = new Struct(newValueSchema);
          mapStruct.put(MAP_KEY, preProcessValue(entry.getKey(), keySchema, newValueSchema.field(MAP_KEY).schema()));
          mapStruct.put(MAP_VALUE, preProcessValue(entry.getValue(), valueSchema, newValueSchema.field(MAP_VALUE).schema()));
      Schema mapKeySchema = newValueSchema.field(MAP_KEY).schema();
      Schema mapValueSchema = newValueSchema.field(MAP_VALUE).schema();
      mapStruct.put(MAP_KEY, preProcessValue(entry.getKey(), keySchema, mapKeySchema));
      mapStruct.put(MAP_VALUE, preProcessValue(entry.getValue(), valueSchema, mapValueSchema));
      mapStructs.add(mapStruct);
    }
    return mapStructs;
      case STRUCT:
  }

  private static Object preProcessStructValue(Object value, Schema schema, Schema newSchema) {
    Struct struct = (Struct) value;
    Struct newStruct = new Struct(newSchema);
    for (Field field : schema.fields()) {
          Object converted =  preProcessValue(struct.get(field), field.schema(), newSchema.field(field.name()).schema());
      Schema newFieldSchema = newSchema.field(field.name()).schema();
      Object converted = preProcessValue(struct.get(field), field.schema(), newFieldSchema);
      newStruct.put(field.name(), converted);
    }
    return newStruct;
      default:
        return value;
    }
  }
}
Loading