Loading src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +3 −2 Original line number Diff line number Diff line Loading @@ -152,8 +152,9 @@ public class DataConverter { if (source.isOptional()) { target.optional(); } if (source.defaultValue() != null) { target.defaultValue(source.defaultValue()); if (source.defaultValue() != null && source.type() != Schema.Type.STRUCT) { final Object preProcessedDefaultValue = preProcessValue(source.defaultValue(), source, target); target.defaultValue(preProcessedDefaultValue); } return target; } Loading src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java +80 −4 Original line number Diff line number Diff line Loading @@ -24,6 +24,7 @@ import org.junit.Test; import java.math.BigDecimal; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; Loading @@ -34,9 +35,39 @@ import static org.junit.Assert.assertEquals; public class DataConverterTest { @Test public void primitivePassThrough() { assertEquals(Schema.INT64_SCHEMA, DataConverter.preProcessSchema(Schema.INT64_SCHEMA)); assertEquals(Schema.OPTIONAL_INT64_SCHEMA, DataConverter.preProcessSchema(Schema.OPTIONAL_INT64_SCHEMA)); public void primitives() { assertIdenticalAfterPreProcess(Schema.INT8_SCHEMA); assertIdenticalAfterPreProcess(Schema.INT16_SCHEMA); assertIdenticalAfterPreProcess(Schema.INT32_SCHEMA); assertIdenticalAfterPreProcess(Schema.INT64_SCHEMA); assertIdenticalAfterPreProcess(Schema.FLOAT32_SCHEMA); assertIdenticalAfterPreProcess(Schema.FLOAT64_SCHEMA); assertIdenticalAfterPreProcess(Schema.BOOLEAN_SCHEMA); assertIdenticalAfterPreProcess(Schema.STRING_SCHEMA); assertIdenticalAfterPreProcess(Schema.BYTES_SCHEMA); assertIdenticalAfterPreProcess(Schema.OPTIONAL_INT16_SCHEMA); assertIdenticalAfterPreProcess(Schema.OPTIONAL_INT32_SCHEMA); assertIdenticalAfterPreProcess(Schema.OPTIONAL_INT64_SCHEMA); assertIdenticalAfterPreProcess(Schema.OPTIONAL_FLOAT32_SCHEMA); assertIdenticalAfterPreProcess(Schema.OPTIONAL_FLOAT64_SCHEMA); assertIdenticalAfterPreProcess(Schema.OPTIONAL_BOOLEAN_SCHEMA); assertIdenticalAfterPreProcess(Schema.OPTIONAL_STRING_SCHEMA); assertIdenticalAfterPreProcess(Schema.OPTIONAL_BYTES_SCHEMA); assertIdenticalAfterPreProcess(SchemaBuilder.int8().defaultValue((byte) 42).build()); assertIdenticalAfterPreProcess(SchemaBuilder.int16().defaultValue((short) 42).build()); assertIdenticalAfterPreProcess(SchemaBuilder.int32().defaultValue(42).build()); assertIdenticalAfterPreProcess(SchemaBuilder.int64().defaultValue(42L).build()); assertIdenticalAfterPreProcess(SchemaBuilder.float32().defaultValue(42.0f).build()); assertIdenticalAfterPreProcess(SchemaBuilder.float64().defaultValue(42.0d).build()); assertIdenticalAfterPreProcess(SchemaBuilder.bool().defaultValue(true).build()); assertIdenticalAfterPreProcess(SchemaBuilder.string().defaultValue("foo").build()); assertIdenticalAfterPreProcess(SchemaBuilder.bytes().defaultValue(new byte[0]).build()); } private void assertIdenticalAfterPreProcess(Schema schema) { assertEquals(schema, DataConverter.preProcessSchema(schema)); } @Test Loading @@ -44,7 +75,20 @@ public class DataConverterTest { Schema origSchema = Decimal.schema(2); Schema preProcessedSchema = DataConverter.preProcessSchema(origSchema); assertEquals(Schema.FLOAT64_SCHEMA, preProcessedSchema); assertEquals(0.02, DataConverter.preProcessValue(new BigDecimal("0.02"), origSchema, preProcessedSchema)); // optional assertEquals( Schema.OPTIONAL_FLOAT64_SCHEMA, DataConverter.preProcessSchema(Decimal.builder(2).optional().build()) ); // defval assertEquals( SchemaBuilder.float64().defaultValue(0.00).build(), DataConverter.preProcessSchema(Decimal.builder(2).defaultValue(new BigDecimal("0.00")).build()) ); } @Test Loading @@ -52,10 +96,23 @@ public class DataConverterTest { Schema origSchema = SchemaBuilder.array(Decimal.schema(2)).schema(); Schema preProcessedSchema = DataConverter.preProcessSchema(origSchema); assertEquals(SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build(), preProcessedSchema); assertEquals( Arrays.asList(0.02, 0.42), DataConverter.preProcessValue(Arrays.asList(new BigDecimal("0.02"), new BigDecimal("0.42")), origSchema, preProcessedSchema) ); // optional assertEquals( SchemaBuilder.array(preProcessedSchema.valueSchema()).optional().build(), DataConverter.preProcessSchema(SchemaBuilder.array(Decimal.schema(2)).optional().build()) ); // defval assertEquals( SchemaBuilder.array(preProcessedSchema.valueSchema()).defaultValue(Collections.emptyList()).build(), DataConverter.preProcessSchema(SchemaBuilder.array(Decimal.schema(2)).defaultValue(Collections.emptyList()).build()) ); } @Test Loading Loading @@ -86,20 +143,39 @@ public class DataConverterTest { )), new HashSet<>((List) DataConverter.preProcessValue(origValue, origSchema, preProcessedSchema)) ); // optional assertEquals( SchemaBuilder.array(preProcessedSchema.valueSchema()).optional().build(), DataConverter.preProcessSchema(SchemaBuilder.map(Schema.STRING_SCHEMA, Decimal.schema(2)).optional().build()) ); // defval assertEquals( SchemaBuilder.array(preProcessedSchema.valueSchema()).defaultValue(Collections.emptyList()).build(), DataConverter.preProcessSchema(SchemaBuilder.map(Schema.STRING_SCHEMA, Decimal.schema(2)).defaultValue(Collections.emptyMap()).build()) ); } @Test public void struct() { Schema origSchema = SchemaBuilder.struct().name("struct").field("decimal", Decimal.schema(2)).schema(); Schema origSchema = SchemaBuilder.struct().name("struct").field("decimal", Decimal.schema(2)).build(); Schema preProcessedSchema = DataConverter.preProcessSchema(origSchema); assertEquals( SchemaBuilder.struct().name("struct").field("decimal", Schema.FLOAT64_SCHEMA).build(), preProcessedSchema ); assertEquals( new Struct(preProcessedSchema).put("decimal", 0.02), DataConverter.preProcessValue(new Struct(origSchema).put("decimal", new BigDecimal("0.02")), origSchema, preProcessedSchema) ); // optional assertEquals( SchemaBuilder.struct().name("struct").field("decimal", Schema.FLOAT64_SCHEMA).optional().build(), DataConverter.preProcessSchema(SchemaBuilder.struct().name("struct").field("decimal", Decimal.schema(2)).optional().build()) ); } } Loading
src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +3 −2 Original line number Diff line number Diff line Loading @@ -152,8 +152,9 @@ public class DataConverter { if (source.isOptional()) { target.optional(); } if (source.defaultValue() != null) { target.defaultValue(source.defaultValue()); if (source.defaultValue() != null && source.type() != Schema.Type.STRUCT) { final Object preProcessedDefaultValue = preProcessValue(source.defaultValue(), source, target); target.defaultValue(preProcessedDefaultValue); } return target; } Loading
src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java +80 −4 Original line number Diff line number Diff line Loading @@ -24,6 +24,7 @@ import org.junit.Test; import java.math.BigDecimal; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; Loading @@ -34,9 +35,39 @@ import static org.junit.Assert.assertEquals; public class DataConverterTest { @Test public void primitivePassThrough() { assertEquals(Schema.INT64_SCHEMA, DataConverter.preProcessSchema(Schema.INT64_SCHEMA)); assertEquals(Schema.OPTIONAL_INT64_SCHEMA, DataConverter.preProcessSchema(Schema.OPTIONAL_INT64_SCHEMA)); public void primitives() { assertIdenticalAfterPreProcess(Schema.INT8_SCHEMA); assertIdenticalAfterPreProcess(Schema.INT16_SCHEMA); assertIdenticalAfterPreProcess(Schema.INT32_SCHEMA); assertIdenticalAfterPreProcess(Schema.INT64_SCHEMA); assertIdenticalAfterPreProcess(Schema.FLOAT32_SCHEMA); assertIdenticalAfterPreProcess(Schema.FLOAT64_SCHEMA); assertIdenticalAfterPreProcess(Schema.BOOLEAN_SCHEMA); assertIdenticalAfterPreProcess(Schema.STRING_SCHEMA); assertIdenticalAfterPreProcess(Schema.BYTES_SCHEMA); assertIdenticalAfterPreProcess(Schema.OPTIONAL_INT16_SCHEMA); assertIdenticalAfterPreProcess(Schema.OPTIONAL_INT32_SCHEMA); assertIdenticalAfterPreProcess(Schema.OPTIONAL_INT64_SCHEMA); assertIdenticalAfterPreProcess(Schema.OPTIONAL_FLOAT32_SCHEMA); assertIdenticalAfterPreProcess(Schema.OPTIONAL_FLOAT64_SCHEMA); assertIdenticalAfterPreProcess(Schema.OPTIONAL_BOOLEAN_SCHEMA); assertIdenticalAfterPreProcess(Schema.OPTIONAL_STRING_SCHEMA); assertIdenticalAfterPreProcess(Schema.OPTIONAL_BYTES_SCHEMA); assertIdenticalAfterPreProcess(SchemaBuilder.int8().defaultValue((byte) 42).build()); assertIdenticalAfterPreProcess(SchemaBuilder.int16().defaultValue((short) 42).build()); assertIdenticalAfterPreProcess(SchemaBuilder.int32().defaultValue(42).build()); assertIdenticalAfterPreProcess(SchemaBuilder.int64().defaultValue(42L).build()); assertIdenticalAfterPreProcess(SchemaBuilder.float32().defaultValue(42.0f).build()); assertIdenticalAfterPreProcess(SchemaBuilder.float64().defaultValue(42.0d).build()); assertIdenticalAfterPreProcess(SchemaBuilder.bool().defaultValue(true).build()); assertIdenticalAfterPreProcess(SchemaBuilder.string().defaultValue("foo").build()); assertIdenticalAfterPreProcess(SchemaBuilder.bytes().defaultValue(new byte[0]).build()); } private void assertIdenticalAfterPreProcess(Schema schema) { assertEquals(schema, DataConverter.preProcessSchema(schema)); } @Test Loading @@ -44,7 +75,20 @@ public class DataConverterTest { Schema origSchema = Decimal.schema(2); Schema preProcessedSchema = DataConverter.preProcessSchema(origSchema); assertEquals(Schema.FLOAT64_SCHEMA, preProcessedSchema); assertEquals(0.02, DataConverter.preProcessValue(new BigDecimal("0.02"), origSchema, preProcessedSchema)); // optional assertEquals( Schema.OPTIONAL_FLOAT64_SCHEMA, DataConverter.preProcessSchema(Decimal.builder(2).optional().build()) ); // defval assertEquals( SchemaBuilder.float64().defaultValue(0.00).build(), DataConverter.preProcessSchema(Decimal.builder(2).defaultValue(new BigDecimal("0.00")).build()) ); } @Test Loading @@ -52,10 +96,23 @@ public class DataConverterTest { Schema origSchema = SchemaBuilder.array(Decimal.schema(2)).schema(); Schema preProcessedSchema = DataConverter.preProcessSchema(origSchema); assertEquals(SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build(), preProcessedSchema); assertEquals( Arrays.asList(0.02, 0.42), DataConverter.preProcessValue(Arrays.asList(new BigDecimal("0.02"), new BigDecimal("0.42")), origSchema, preProcessedSchema) ); // optional assertEquals( SchemaBuilder.array(preProcessedSchema.valueSchema()).optional().build(), DataConverter.preProcessSchema(SchemaBuilder.array(Decimal.schema(2)).optional().build()) ); // defval assertEquals( SchemaBuilder.array(preProcessedSchema.valueSchema()).defaultValue(Collections.emptyList()).build(), DataConverter.preProcessSchema(SchemaBuilder.array(Decimal.schema(2)).defaultValue(Collections.emptyList()).build()) ); } @Test Loading Loading @@ -86,20 +143,39 @@ public class DataConverterTest { )), new HashSet<>((List) DataConverter.preProcessValue(origValue, origSchema, preProcessedSchema)) ); // optional assertEquals( SchemaBuilder.array(preProcessedSchema.valueSchema()).optional().build(), DataConverter.preProcessSchema(SchemaBuilder.map(Schema.STRING_SCHEMA, Decimal.schema(2)).optional().build()) ); // defval assertEquals( SchemaBuilder.array(preProcessedSchema.valueSchema()).defaultValue(Collections.emptyList()).build(), DataConverter.preProcessSchema(SchemaBuilder.map(Schema.STRING_SCHEMA, Decimal.schema(2)).defaultValue(Collections.emptyMap()).build()) ); } @Test public void struct() { Schema origSchema = SchemaBuilder.struct().name("struct").field("decimal", Decimal.schema(2)).schema(); Schema origSchema = SchemaBuilder.struct().name("struct").field("decimal", Decimal.schema(2)).build(); Schema preProcessedSchema = DataConverter.preProcessSchema(origSchema); assertEquals( SchemaBuilder.struct().name("struct").field("decimal", Schema.FLOAT64_SCHEMA).build(), preProcessedSchema ); assertEquals( new Struct(preProcessedSchema).put("decimal", 0.02), DataConverter.preProcessValue(new Struct(origSchema).put("decimal", new BigDecimal("0.02")), origSchema, preProcessedSchema) ); // optional assertEquals( SchemaBuilder.struct().name("struct").field("decimal", Schema.FLOAT64_SCHEMA).optional().build(), DataConverter.preProcessSchema(SchemaBuilder.struct().name("struct").field("decimal", Decimal.schema(2)).optional().build()) ); } }