Commit 36e4547c authored by Chris Egerton's avatar Chris Egerton
Browse files

Handle null values in optional struct fields with no defaults

parent d3ef73cb
Loading
Loading
Loading
Loading
+5 −5
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@

package io.confluent.connect.elasticsearch;

import com.sun.istack.internal.NotNull;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
@@ -235,7 +236,8 @@ public class DataConverter {
    Schema preprocessedKeySchema = preProcessSchema(keySchema);
    Schema preprocessedValueSchema = preProcessSchema(valueSchema);
    if (useCompactMapEntries && keySchema.type() == Schema.Type.STRING) {
      return SchemaBuilder.map(preprocessedKeySchema, preprocessedValueSchema).build();
      SchemaBuilder result = SchemaBuilder.map(preprocessedKeySchema, preprocessedValueSchema);
      return copySchemaBasics(schema, result).build();
    }
    Schema elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName)
        .field(MAP_KEY, preprocessedKeySchema)
@@ -269,11 +271,9 @@ public class DataConverter {
    if (schema == null) {
      return value;
    }
    
    if (value == null) {
      Object result = preProcessNullValue(schema);
      if (result != null) {
        return result;
      }
      return preProcessNullValue(schema);
    }

    // Handle logical types
+33 −4
Original line number Diff line number Diff line
@@ -16,10 +16,7 @@

package io.confluent.connect.elasticsearch;

import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.*;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Before;
@@ -263,6 +260,38 @@ public class DataConverterTest {
    );
  }

  @Test
  public void optionalFieldsWithoutDefaults() {
    // One primitive type should be enough
    testOptionalFieldWithoutDefault(SchemaBuilder.bool());
    // Logical types
    testOptionalFieldWithoutDefault(Decimal.builder(2));
    testOptionalFieldWithoutDefault(Time.builder());
    testOptionalFieldWithoutDefault(Timestamp.builder());
    // Complex types
    testOptionalFieldWithoutDefault(SchemaBuilder.array(Schema.BOOLEAN_SCHEMA));
    testOptionalFieldWithoutDefault(SchemaBuilder.struct().field("innerField", Schema.BOOLEAN_SCHEMA));
    testOptionalFieldWithoutDefault(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.BOOLEAN_SCHEMA));
    // Have to test maps with useCompactMapEntries set to true and set to false
    converter = new DataConverter(false, BehaviorOnNullValues.DEFAULT);
    testOptionalFieldWithoutDefault(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.BOOLEAN_SCHEMA));
  }

  private void testOptionalFieldWithoutDefault(
    SchemaBuilder optionalFieldSchema
  ) {
    Schema origSchema = SchemaBuilder.struct().name("struct").field(
        "optionalField", optionalFieldSchema.optional().build()
    );
    Schema preProcessedSchema = converter.preProcessSchema(origSchema);

    Object preProcessedValue = converter.preProcessValue(
        new Struct(origSchema).put("optionalField", null), origSchema, preProcessedSchema
    );

    assertEquals(new Struct(preProcessedSchema).put("optionalField", null), preProcessedValue);
  }

  @Test
  public void ignoreOnNullValue() {
    converter = new DataConverter(true, BehaviorOnNullValues.IGNORE);