Loading src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +9 −0 Original line number Diff line number Diff line Loading @@ -324,6 +324,9 @@ public class DataConverter { } private Object preProcessArrayValue(Object value, Schema schema, Schema newSchema) { if (value == null) { return null; } Collection collection = (Collection) value; List<Object> result = new ArrayList<>(); for (Object element: collection) { Loading @@ -333,6 +336,9 @@ public class DataConverter { } private Object preProcessMapValue(Object value, Schema schema, Schema newSchema) { if (value == null) { return null; } Schema keySchema = schema.keySchema(); Schema valueSchema = schema.valueSchema(); Schema newValueSchema = newSchema.valueSchema(); Loading Loading @@ -360,6 +366,9 @@ public class DataConverter { } private Object preProcessStructValue(Object value, Schema schema, Schema newSchema) { if (value == null) { return null; } Struct struct = (Struct) value; Struct newStruct = new Struct(newSchema); for (Field field : schema.fields()) { Loading src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java +36 −0 Original line number Diff line number Diff line Loading @@ -147,6 +147,18 @@ public class DataConverterTest { ); } @Test public void nullArray() { // Create optional schema with no default value Schema origSchema = SchemaBuilder.array(Decimal.schema(2)).optional().schema(); Schema preProcessedSchema = converter.preProcessSchema(origSchema); assertEquals( null, converter.preProcessValue(null, origSchema, preProcessedSchema) ); } @Test public void map() { Schema origSchema = SchemaBuilder.map(Schema.INT32_SCHEMA, Decimal.schema(2)).build(); Loading Loading @@ -189,6 +201,18 @@ public class DataConverterTest { ); } @Test public void nullMap() { // Create optional schema with no default value Schema origSchema = SchemaBuilder.map(Schema.INT32_SCHEMA, Decimal.schema(2)).optional().build(); Schema preProcessedSchema = converter.preProcessSchema(origSchema); assertEquals( null, converter.preProcessValue(null, origSchema, preProcessedSchema) ); } @Test public void stringKeyedMapNonCompactFormat() { Schema origSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(); Loading Loading @@ -263,6 +287,18 @@ public class DataConverterTest { ); } @Test public void nullStruct() { // Create optional schema with no default value Schema origSchema = SchemaBuilder.struct().name("struct").field("decimal", Decimal.schema(2)).optional().build(); Schema preProcessedSchema = converter.preProcessSchema(origSchema); assertEquals( null, converter.preProcessValue(null, origSchema, preProcessedSchema) ); } @Test public void ignoreOnNullValue() { converter = new DataConverter(true, BehaviorOnNullValues.IGNORE); Loading Loading
src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +9 −0 Original line number Diff line number Diff line Loading @@ -324,6 +324,9 @@ public class DataConverter { } private Object preProcessArrayValue(Object value, Schema schema, Schema newSchema) { if (value == null) { return null; } Collection collection = (Collection) value; List<Object> result = new ArrayList<>(); for (Object element: collection) { Loading @@ -333,6 +336,9 @@ public class DataConverter { } private Object preProcessMapValue(Object value, Schema schema, Schema newSchema) { if (value == null) { return null; } Schema keySchema = schema.keySchema(); Schema valueSchema = schema.valueSchema(); Schema newValueSchema = newSchema.valueSchema(); Loading Loading @@ -360,6 +366,9 @@ public class DataConverter { } private Object preProcessStructValue(Object value, Schema schema, Schema newSchema) { if (value == null) { return null; } Struct struct = (Struct) value; Struct newStruct = new Struct(newSchema); for (Field field : schema.fields()) { Loading
src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java +36 −0 Original line number Diff line number Diff line Loading @@ -147,6 +147,18 @@ public class DataConverterTest { ); } @Test public void nullArray() { // Create optional schema with no default value Schema origSchema = SchemaBuilder.array(Decimal.schema(2)).optional().schema(); Schema preProcessedSchema = converter.preProcessSchema(origSchema); assertEquals( null, converter.preProcessValue(null, origSchema, preProcessedSchema) ); } @Test public void map() { Schema origSchema = SchemaBuilder.map(Schema.INT32_SCHEMA, Decimal.schema(2)).build(); Loading Loading @@ -189,6 +201,18 @@ public class DataConverterTest { ); } @Test public void nullMap() { // Create optional schema with no default value Schema origSchema = SchemaBuilder.map(Schema.INT32_SCHEMA, Decimal.schema(2)).optional().build(); Schema preProcessedSchema = converter.preProcessSchema(origSchema); assertEquals( null, converter.preProcessValue(null, origSchema, preProcessedSchema) ); } @Test public void stringKeyedMapNonCompactFormat() { Schema origSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(); Loading Loading @@ -263,6 +287,18 @@ public class DataConverterTest { ); } @Test public void nullStruct() { // Create optional schema with no default value Schema origSchema = SchemaBuilder.struct().name("struct").field("decimal", Decimal.schema(2)).optional().build(); Schema preProcessedSchema = converter.preProcessSchema(origSchema); assertEquals( null, converter.preProcessValue(null, origSchema, preProcessedSchema) ); } @Test public void ignoreOnNullValue() { converter = new DataConverter(true, BehaviorOnNullValues.IGNORE); Loading