Commit 11c4c561 authored by Shikhar Bhushan's avatar Shikhar Bhushan
Browse files

refactored DataConverterTest

parent ded189fc
Loading
Loading
Loading
Loading
+62 −82
Original line number Diff line number Diff line
@@ -23,12 +23,9 @@ import org.apache.kafka.connect.data.Struct;
import org.junit.Test;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

@@ -36,90 +33,73 @@ import static org.junit.Assert.assertEquals;

public class DataConverterTest {

  private static final int SCALE = 2;
  private static final Schema DECIMAL_SCHEMA = Decimal.schema(SCALE);

  @Test
  public void testPreProcessSchema() {
    SchemaBuilder builder;
    Schema schema;
    Schema newSchema;
  public void primitivePassThrough() {
    assertEquals(Schema.INT64_SCHEMA, DataConverter.preProcessSchema(Schema.INT64_SCHEMA));
    assertEquals(Schema.OPTIONAL_INT64_SCHEMA, DataConverter.preProcessSchema(Schema.OPTIONAL_INT64_SCHEMA));
  }

    newSchema = DataConverter.preProcessSchema(DECIMAL_SCHEMA);
    assertEquals(Schema.FLOAT64_SCHEMA, newSchema);
  @Test
  public void decimal() {
    Schema origSchema = Decimal.schema(2);
    Schema preProcessedSchema = DataConverter.preProcessSchema(origSchema);
    assertEquals(Schema.FLOAT64_SCHEMA, preProcessedSchema);
    assertEquals(0.02, DataConverter.preProcessValue(new BigDecimal("0.02"), origSchema, preProcessedSchema));
  }

    builder = SchemaBuilder.array(Decimal.schema(SCALE));
    schema = builder.build();
    newSchema = DataConverter.preProcessSchema(schema);
    assertEquals(SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build(), newSchema);
  @Test
  public void array() {
    Schema origSchema = SchemaBuilder.array(Decimal.schema(2)).schema();
    Schema preProcessedSchema = DataConverter.preProcessSchema(origSchema);
    assertEquals(SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build(), preProcessedSchema);
    assertEquals(
        Arrays.asList(0.02, 0.42),
        DataConverter.preProcessValue(Arrays.asList(new BigDecimal("0.02"), new BigDecimal("0.42")), origSchema, preProcessedSchema)
    );
  }

    builder = SchemaBuilder.map(Schema.STRING_SCHEMA, DECIMAL_SCHEMA);
    schema = builder.build();
    newSchema = DataConverter.preProcessSchema(schema);
    String keyName = Schema.STRING_SCHEMA.type().name();
    String valueName = Decimal.LOGICAL_NAME;
    Schema expectedSchema = SchemaBuilder.array(SchemaBuilder.struct().name(keyName + "-" + valueName)
  @Test
  public void map() {
    Schema origSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Decimal.schema(2)).build();
    Schema preProcessedSchema = DataConverter.preProcessSchema(origSchema);
    assertEquals(
        SchemaBuilder.array(
            SchemaBuilder.struct().name(Schema.STRING_SCHEMA.type().name() + "-" + Decimal.LOGICAL_NAME)
                .field(ElasticsearchSinkConnectorConstants.MAP_KEY, Schema.STRING_SCHEMA)
                .field(ElasticsearchSinkConnectorConstants.MAP_VALUE, Schema.FLOAT64_SCHEMA)
        .build()).build();
    assertEquals(expectedSchema, newSchema);

    builder = SchemaBuilder.struct().name("struct")
        .field("decimal", DECIMAL_SCHEMA);
    schema = builder.schema();
    newSchema = DataConverter.preProcessSchema(schema);
    expectedSchema = SchemaBuilder.struct().name("struct")
        .field("decimal", Schema.FLOAT64_SCHEMA)
        .build();
    assertEquals(expectedSchema, newSchema);
                .build()
        ).build(),
        preProcessedSchema
    );

    Map<Object, Object> origValue = new HashMap<>();
    origValue.put("field1", new BigDecimal("0.02"));
    origValue.put("field2", new BigDecimal("0.42"));
    assertEquals(
        new HashSet<>(Arrays.asList(
            new Struct(preProcessedSchema.valueSchema())
                .put(ElasticsearchSinkConnectorConstants.MAP_KEY, "field1")
                .put(ElasticsearchSinkConnectorConstants.MAP_VALUE, 0.02),
            new Struct(preProcessedSchema.valueSchema())
                .put(ElasticsearchSinkConnectorConstants.MAP_KEY, "field2")
                .put(ElasticsearchSinkConnectorConstants.MAP_VALUE, 0.42)
        )),
        new HashSet<>((List) DataConverter.preProcessValue(origValue, origSchema, preProcessedSchema))
    );
  }

  @SuppressWarnings("unchecked")
  @Test
  public void testPreProcessValue() {
    double expectedValue = 0.02;
    byte[] bytes = ByteBuffer.allocate(4).putInt(2).array();
    BigDecimal value = new BigDecimal(new BigInteger(bytes), SCALE);

    SchemaBuilder builder;
    Schema schema;
    Schema newSchema;

    newSchema = DataConverter.preProcessSchema(DECIMAL_SCHEMA);

    Object newValue = DataConverter.preProcessValue(value, DECIMAL_SCHEMA, newSchema);
    assertEquals(expectedValue, newValue);

    builder = SchemaBuilder.array(DECIMAL_SCHEMA);
    schema = builder.build();
    newSchema = DataConverter.preProcessSchema(schema);
    ArrayList<Object> values = new ArrayList<>();
    values.add(value);
    newValue = DataConverter.preProcessValue(values, schema, newSchema);
    List<Object> result = (List<Object>) newValue;
    for (Object element: result) {
      assertEquals(expectedValue, element);
  public void struct() {
    Schema origSchema = SchemaBuilder.struct().name("struct").field("decimal", Decimal.schema(2)).schema();
    Schema preProcessedSchema = DataConverter.preProcessSchema(origSchema);
    assertEquals(
        SchemaBuilder.struct().name("struct").field("decimal", Schema.FLOAT64_SCHEMA).build(),
        preProcessedSchema
    );
    assertEquals(
        new Struct(preProcessedSchema).put("decimal", 0.02),
        DataConverter.preProcessValue(new Struct(origSchema).put("decimal", new BigDecimal("0.02")), origSchema, preProcessedSchema)
    );
  }

    builder = SchemaBuilder.map(Schema.STRING_SCHEMA, DECIMAL_SCHEMA);
    schema = builder.build();
    newSchema = DataConverter.preProcessSchema(schema);

    Map<Object, Object> original = new HashMap<>();
    original.put("field1", value);
    original.put("field2", value);

    List<Struct> structs = new LinkedList<>();
    Struct expected = new Struct(newSchema.valueSchema());
    expected.put(ElasticsearchSinkConnectorConstants.MAP_KEY, "field1");
    expected.put(ElasticsearchSinkConnectorConstants.MAP_VALUE, 0.02);
    structs.add(expected);

    expected = new Struct(newSchema.valueSchema());
    expected.put(ElasticsearchSinkConnectorConstants.MAP_KEY, "field2");
    expected.put(ElasticsearchSinkConnectorConstants.MAP_VALUE, 0.02);
    structs.add(expected);

    assertEquals(new HashSet<>(structs), new HashSet<>((List<?>) DataConverter.preProcessValue(original, schema, newSchema)));
  }
}