Commit f02277c3 authored by Shikhar Bhushan's avatar Shikhar Bhushan Committed by GitHub
Browse files

Merge pull request #36 from confluentinc/opt-and-defval

Support optional & default values when pre-processing schemas & values	
parents bd2768ed f22558f6
Loading
Loading
Loading
Loading
+43 −23
Original line number Diff line number Diff line
@@ -53,11 +53,13 @@ public class DataConverter {
    if (key == null) {
      throw new ConnectException("Key is used as document id and can not be null.");
    }
    Schema.Type schemaType;

    final Schema.Type schemaType;
    if (keySchema == null) {
      schemaType = ConnectSchema.schemaType(key.getClass());
      if (schemaType == null)
      if (schemaType == null) {
        throw new DataException("Java class " + key.getClass() + " does not have corresponding schema type.");
      }
    } else {
      schemaType = keySchema.type();
    }
@@ -105,13 +107,11 @@ public class DataConverter {
      return null;
    }
    // Handle logical types
    SchemaBuilder builder;
    String schemaName = schema.name();
    if (schemaName != null) {
      switch (schemaName) {
        case Decimal.LOGICAL_NAME:
          builder = SchemaBuilder.float64();
          return builder.build();
          return copySchemaBasics(schema, SchemaBuilder.float64()).build();
        case Date.LOGICAL_NAME:
        case Time.LOGICAL_NAME:
        case Timestamp.LOGICAL_NAME:
@@ -120,39 +120,59 @@ public class DataConverter {
    }

    Schema.Type schemaType = schema.type();
    Schema keySchema;
    Schema valueSchema;
    switch (schemaType) {
      case ARRAY:
        valueSchema = schema.valueSchema();
        builder = SchemaBuilder.array(preProcessSchema(valueSchema));
        return builder.build();
      case MAP:
        keySchema = schema.keySchema();
        valueSchema = schema.valueSchema();
      case ARRAY: {
        return copySchemaBasics(schema, SchemaBuilder.array(preProcessSchema(schema.valueSchema()))).build();
      }
      case MAP: {
        Schema keySchema = schema.keySchema();
        Schema valueSchema = schema.valueSchema();
        String keyName = keySchema.name() == null ? keySchema.type().name() : keySchema.name();
        String valueName = valueSchema.name() == null ? valueSchema.type().name() : valueSchema.name();
        builder = SchemaBuilder.array(SchemaBuilder.struct().name(keyName + "-" + valueName)
        Schema elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName)
            .field(MAP_KEY, preProcessSchema(keySchema))
            .field(MAP_VALUE, preProcessSchema(valueSchema))
            .build());
        return builder.build();
      case STRUCT:
        builder = SchemaBuilder.struct().name(schema.name());
            .build();
        return copySchemaBasics(schema, SchemaBuilder.array(elementSchema)).build();
      }
      case STRUCT: {
        SchemaBuilder structBuilder = copySchemaBasics(schema, SchemaBuilder.struct().name(schemaName));
        for (Field field : schema.fields()) {
          builder.field(field.name(), preProcessSchema(field.schema()));
          structBuilder.field(field.name(), preProcessSchema(field.schema()));
        }
        return builder.build();
      default:
        return structBuilder.build();
      }
      default: {
        return schema;
      }
    }
  }

  private static SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder target) {
    if (source.isOptional()) {
      target.optional();
    }
    if (source.defaultValue() != null && source.type() != Schema.Type.STRUCT) {
      final Object preProcessedDefaultValue = preProcessValue(source.defaultValue(), source, target);
      target.defaultValue(preProcessedDefaultValue);
    }
    return target;
  }

  // visible for testing
  static Object preProcessValue(Object value, Schema schema, Schema newSchema) {
    if (schema == null) {
      return value;
    }
    if (value == null) {
      if (schema.defaultValue() != null) {
        return schema.defaultValue();
      }
      if (schema.isOptional()) {
        return null;
      }
      throw new DataException("null value for field that is required and has no default value");
    }

    // Handle logical types
    String schemaName = schema.name();
+140 −84
Original line number Diff line number Diff line
@@ -23,12 +23,10 @@ import org.apache.kafka.connect.data.Struct;
import org.junit.Test;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

@@ -36,90 +34,148 @@ import static org.junit.Assert.assertEquals;

public class DataConverterTest {

  private static final int SCALE = 2;
  private static final Schema DECIMAL_SCHEMA = Decimal.schema(SCALE);

  @Test
  public void testPreProcessSchema() {
    SchemaBuilder builder;
    Schema schema;
    Schema newSchema;

    newSchema = DataConverter.preProcessSchema(DECIMAL_SCHEMA);
    assertEquals(Schema.FLOAT64_SCHEMA, newSchema);

    builder = SchemaBuilder.array(Decimal.schema(SCALE));
    schema = builder.build();
    newSchema = DataConverter.preProcessSchema(schema);
    assertEquals(SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build(), newSchema);

    builder = SchemaBuilder.map(Schema.STRING_SCHEMA, DECIMAL_SCHEMA);
    schema = builder.build();
    newSchema = DataConverter.preProcessSchema(schema);
    String keyName = Schema.STRING_SCHEMA.type().name();
    String valueName = Decimal.LOGICAL_NAME;
    Schema expectedSchema = SchemaBuilder.array(SchemaBuilder.struct().name(keyName + "-" + valueName)
        .field(ElasticsearchSinkConnectorConstants.MAP_KEY, Schema.STRING_SCHEMA)
        .field(ElasticsearchSinkConnectorConstants.MAP_VALUE, Schema.FLOAT64_SCHEMA)
        .build()).build();
    assertEquals(expectedSchema, newSchema);

    builder = SchemaBuilder.struct().name("struct")
        .field("decimal", DECIMAL_SCHEMA);
    schema = builder.schema();
    newSchema = DataConverter.preProcessSchema(schema);
    expectedSchema = SchemaBuilder.struct().name("struct")
        .field("decimal", Schema.FLOAT64_SCHEMA)
        .build();
    assertEquals(expectedSchema, newSchema);
  public void primitives() {
    assertIdenticalAfterPreProcess(Schema.INT8_SCHEMA);
    assertIdenticalAfterPreProcess(Schema.INT16_SCHEMA);
    assertIdenticalAfterPreProcess(Schema.INT32_SCHEMA);
    assertIdenticalAfterPreProcess(Schema.INT64_SCHEMA);
    assertIdenticalAfterPreProcess(Schema.FLOAT32_SCHEMA);
    assertIdenticalAfterPreProcess(Schema.FLOAT64_SCHEMA);
    assertIdenticalAfterPreProcess(Schema.BOOLEAN_SCHEMA);
    assertIdenticalAfterPreProcess(Schema.STRING_SCHEMA);
    assertIdenticalAfterPreProcess(Schema.BYTES_SCHEMA);

    assertIdenticalAfterPreProcess(Schema.OPTIONAL_INT16_SCHEMA);
    assertIdenticalAfterPreProcess(Schema.OPTIONAL_INT32_SCHEMA);
    assertIdenticalAfterPreProcess(Schema.OPTIONAL_INT64_SCHEMA);
    assertIdenticalAfterPreProcess(Schema.OPTIONAL_FLOAT32_SCHEMA);
    assertIdenticalAfterPreProcess(Schema.OPTIONAL_FLOAT64_SCHEMA);
    assertIdenticalAfterPreProcess(Schema.OPTIONAL_BOOLEAN_SCHEMA);
    assertIdenticalAfterPreProcess(Schema.OPTIONAL_STRING_SCHEMA);
    assertIdenticalAfterPreProcess(Schema.OPTIONAL_BYTES_SCHEMA);

    assertIdenticalAfterPreProcess(SchemaBuilder.int8().defaultValue((byte) 42).build());
    assertIdenticalAfterPreProcess(SchemaBuilder.int16().defaultValue((short) 42).build());
    assertIdenticalAfterPreProcess(SchemaBuilder.int32().defaultValue(42).build());
    assertIdenticalAfterPreProcess(SchemaBuilder.int64().defaultValue(42L).build());
    assertIdenticalAfterPreProcess(SchemaBuilder.float32().defaultValue(42.0f).build());
    assertIdenticalAfterPreProcess(SchemaBuilder.float64().defaultValue(42.0d).build());
    assertIdenticalAfterPreProcess(SchemaBuilder.bool().defaultValue(true).build());
    assertIdenticalAfterPreProcess(SchemaBuilder.string().defaultValue("foo").build());
    assertIdenticalAfterPreProcess(SchemaBuilder.bytes().defaultValue(new byte[0]).build());
  }

  @SuppressWarnings("unchecked")
  @Test
  public void testPreProcessValue() {
    double expectedValue = 0.02;
    byte[] bytes = ByteBuffer.allocate(4).putInt(2).array();
    BigDecimal value = new BigDecimal(new BigInteger(bytes), SCALE);

    SchemaBuilder builder;
    Schema schema;
    Schema newSchema;

    newSchema = DataConverter.preProcessSchema(DECIMAL_SCHEMA);

    Object newValue = DataConverter.preProcessValue(value, DECIMAL_SCHEMA, newSchema);
    assertEquals(expectedValue, newValue);

    builder = SchemaBuilder.array(DECIMAL_SCHEMA);
    schema = builder.build();
    newSchema = DataConverter.preProcessSchema(schema);
    ArrayList<Object> values = new ArrayList<>();
    values.add(value);
    newValue = DataConverter.preProcessValue(values, schema, newSchema);
    List<Object> result = (List<Object>) newValue;
    for (Object element: result) {
      assertEquals(expectedValue, element);
  private void assertIdenticalAfterPreProcess(Schema schema) {
    assertEquals(schema, DataConverter.preProcessSchema(schema));
  }

    builder = SchemaBuilder.map(Schema.STRING_SCHEMA, DECIMAL_SCHEMA);
    schema = builder.build();
    newSchema = DataConverter.preProcessSchema(schema);

    Map<Object, Object> original = new HashMap<>();
    original.put("field1", value);
    original.put("field2", value);
  @Test
  public void decimal() {
    Schema origSchema = Decimal.schema(2);
    Schema preProcessedSchema = DataConverter.preProcessSchema(origSchema);
    assertEquals(Schema.FLOAT64_SCHEMA, preProcessedSchema);

    assertEquals(0.02, DataConverter.preProcessValue(new BigDecimal("0.02"), origSchema, preProcessedSchema));

    // optional
    assertEquals(
        Schema.OPTIONAL_FLOAT64_SCHEMA,
        DataConverter.preProcessSchema(Decimal.builder(2).optional().build())
    );

    // defval
    assertEquals(
        SchemaBuilder.float64().defaultValue(0.00).build(),
        DataConverter.preProcessSchema(Decimal.builder(2).defaultValue(new BigDecimal("0.00")).build())
    );
  }

    List<Struct> structs = new LinkedList<>();
    Struct expected = new Struct(newSchema.valueSchema());
    expected.put(ElasticsearchSinkConnectorConstants.MAP_KEY, "field1");
    expected.put(ElasticsearchSinkConnectorConstants.MAP_VALUE, 0.02);
    structs.add(expected);
  @Test
  public void array() {
    Schema origSchema = SchemaBuilder.array(Decimal.schema(2)).schema();
    Schema preProcessedSchema = DataConverter.preProcessSchema(origSchema);
    assertEquals(SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build(), preProcessedSchema);

    assertEquals(
        Arrays.asList(0.02, 0.42),
        DataConverter.preProcessValue(Arrays.asList(new BigDecimal("0.02"), new BigDecimal("0.42")), origSchema, preProcessedSchema)
    );

    // optional
    assertEquals(
        SchemaBuilder.array(preProcessedSchema.valueSchema()).optional().build(),
        DataConverter.preProcessSchema(SchemaBuilder.array(Decimal.schema(2)).optional().build())
    );

    // defval
    assertEquals(
        SchemaBuilder.array(preProcessedSchema.valueSchema()).defaultValue(Collections.emptyList()).build(),
        DataConverter.preProcessSchema(SchemaBuilder.array(Decimal.schema(2)).defaultValue(Collections.emptyList()).build())
    );
  }

    expected = new Struct(newSchema.valueSchema());
    expected.put(ElasticsearchSinkConnectorConstants.MAP_KEY, "field2");
    expected.put(ElasticsearchSinkConnectorConstants.MAP_VALUE, 0.02);
    structs.add(expected);
  @Test
  public void map() {
    Schema origSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Decimal.schema(2)).build();
    Schema preProcessedSchema = DataConverter.preProcessSchema(origSchema);
    assertEquals(
        SchemaBuilder.array(
            SchemaBuilder.struct().name(Schema.STRING_SCHEMA.type().name() + "-" + Decimal.LOGICAL_NAME)
                .field(ElasticsearchSinkConnectorConstants.MAP_KEY, Schema.STRING_SCHEMA)
                .field(ElasticsearchSinkConnectorConstants.MAP_VALUE, Schema.FLOAT64_SCHEMA)
                .build()
        ).build(),
        preProcessedSchema
    );

    Map<Object, Object> origValue = new HashMap<>();
    origValue.put("field1", new BigDecimal("0.02"));
    origValue.put("field2", new BigDecimal("0.42"));
    assertEquals(
        new HashSet<>(Arrays.asList(
            new Struct(preProcessedSchema.valueSchema())
                .put(ElasticsearchSinkConnectorConstants.MAP_KEY, "field1")
                .put(ElasticsearchSinkConnectorConstants.MAP_VALUE, 0.02),
            new Struct(preProcessedSchema.valueSchema())
                .put(ElasticsearchSinkConnectorConstants.MAP_KEY, "field2")
                .put(ElasticsearchSinkConnectorConstants.MAP_VALUE, 0.42)
        )),
        new HashSet<>((List) DataConverter.preProcessValue(origValue, origSchema, preProcessedSchema))
    );

    // optional
    assertEquals(
        SchemaBuilder.array(preProcessedSchema.valueSchema()).optional().build(),
        DataConverter.preProcessSchema(SchemaBuilder.map(Schema.STRING_SCHEMA, Decimal.schema(2)).optional().build())
    );

    // defval
    assertEquals(
        SchemaBuilder.array(preProcessedSchema.valueSchema()).defaultValue(Collections.emptyList()).build(),
        DataConverter.preProcessSchema(SchemaBuilder.map(Schema.STRING_SCHEMA, Decimal.schema(2)).defaultValue(Collections.emptyMap()).build())
    );
  }

    assertEquals(new HashSet<>(structs), new HashSet<>((List<?>) DataConverter.preProcessValue(original, schema, newSchema)));
  @Test
  public void struct() {
    Schema origSchema = SchemaBuilder.struct().name("struct").field("decimal", Decimal.schema(2)).build();
    Schema preProcessedSchema = DataConverter.preProcessSchema(origSchema);
    assertEquals(
        SchemaBuilder.struct().name("struct").field("decimal", Schema.FLOAT64_SCHEMA).build(),
        preProcessedSchema
    );

    assertEquals(
        new Struct(preProcessedSchema).put("decimal", 0.02),
        DataConverter.preProcessValue(new Struct(origSchema).put("decimal", new BigDecimal("0.02")), origSchema, preProcessedSchema)
    );

    // optional
    assertEquals(
        SchemaBuilder.struct().name("struct").field("decimal", Schema.FLOAT64_SCHEMA).optional().build(),
        DataConverter.preProcessSchema(SchemaBuilder.struct().name("struct").field("decimal", Decimal.schema(2)).optional().build())
    );
  }

}