Loading src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +4 −5 Original line number Diff line number Diff line Loading @@ -235,7 +235,8 @@ public class DataConverter { Schema preprocessedKeySchema = preProcessSchema(keySchema); Schema preprocessedValueSchema = preProcessSchema(valueSchema); if (useCompactMapEntries && keySchema.type() == Schema.Type.STRING) { return SchemaBuilder.map(preprocessedKeySchema, preprocessedValueSchema).build(); SchemaBuilder result = SchemaBuilder.map(preprocessedKeySchema, preprocessedValueSchema); return copySchemaBasics(schema, result).build(); } Schema elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName) .field(MAP_KEY, preprocessedKeySchema) Loading Loading @@ -269,11 +270,9 @@ public class DataConverter { if (schema == null) { return value; } if (value == null) { Object result = preProcessNullValue(schema); if (result != null) { return result; } return preProcessNullValue(schema); } // Handle logical types Loading src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java +33 −4 Original line number Diff line number Diff line Loading @@ -16,10 +16,7 @@ package io.confluent.connect.elasticsearch; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.*; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.Before; Loading Loading @@ -263,6 +260,38 @@ public class DataConverterTest { ); } @Test public void optionalFieldsWithoutDefaults() { // One primitive type should be enough testOptionalFieldWithoutDefault(SchemaBuilder.bool()); // Logical types testOptionalFieldWithoutDefault(Decimal.builder(2)); testOptionalFieldWithoutDefault(Time.builder()); testOptionalFieldWithoutDefault(Timestamp.builder()); // Complex types testOptionalFieldWithoutDefault(SchemaBuilder.array(Schema.BOOLEAN_SCHEMA)); testOptionalFieldWithoutDefault(SchemaBuilder.struct().field("innerField", Schema.BOOLEAN_SCHEMA)); testOptionalFieldWithoutDefault(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.BOOLEAN_SCHEMA)); // Have to test maps with useCompactMapEntries set to true and set to false converter = new DataConverter(false, BehaviorOnNullValues.DEFAULT); testOptionalFieldWithoutDefault(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.BOOLEAN_SCHEMA)); } private void testOptionalFieldWithoutDefault( SchemaBuilder optionalFieldSchema ) { Schema origSchema = SchemaBuilder.struct().name("struct").field( "optionalField", optionalFieldSchema.optional().build() ).build(); Schema preProcessedSchema = converter.preProcessSchema(origSchema); Object preProcessedValue = converter.preProcessValue( new Struct(origSchema).put("optionalField", null), origSchema, preProcessedSchema ); assertEquals(new Struct(preProcessedSchema).put("optionalField", null), preProcessedValue); } @Test public void ignoreOnNullValue() { converter = new DataConverter(true, BehaviorOnNullValues.IGNORE); Loading Loading
src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +4 −5 Original line number Diff line number Diff line Loading @@ -235,7 +235,8 @@ public class DataConverter { Schema preprocessedKeySchema = preProcessSchema(keySchema); Schema preprocessedValueSchema = preProcessSchema(valueSchema); if (useCompactMapEntries && keySchema.type() == Schema.Type.STRING) { return SchemaBuilder.map(preprocessedKeySchema, preprocessedValueSchema).build(); SchemaBuilder result = SchemaBuilder.map(preprocessedKeySchema, preprocessedValueSchema); return copySchemaBasics(schema, result).build(); } Schema elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName) .field(MAP_KEY, preprocessedKeySchema) Loading Loading @@ -269,11 +270,9 @@ public class DataConverter { if (schema == null) { return value; } if (value == null) { Object result = preProcessNullValue(schema); if (result != null) { return result; } return preProcessNullValue(schema); } // Handle logical types Loading
src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java +33 −4 Original line number Diff line number Diff line Loading @@ -16,10 +16,7 @@ package io.confluent.connect.elasticsearch; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.*; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.Before; Loading Loading @@ -263,6 +260,38 @@ public class DataConverterTest { ); } @Test public void optionalFieldsWithoutDefaults() { // One primitive type should be enough testOptionalFieldWithoutDefault(SchemaBuilder.bool()); // Logical types testOptionalFieldWithoutDefault(Decimal.builder(2)); testOptionalFieldWithoutDefault(Time.builder()); testOptionalFieldWithoutDefault(Timestamp.builder()); // Complex types testOptionalFieldWithoutDefault(SchemaBuilder.array(Schema.BOOLEAN_SCHEMA)); testOptionalFieldWithoutDefault(SchemaBuilder.struct().field("innerField", Schema.BOOLEAN_SCHEMA)); testOptionalFieldWithoutDefault(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.BOOLEAN_SCHEMA)); // Have to test maps with useCompactMapEntries set to true and set to false converter = new DataConverter(false, BehaviorOnNullValues.DEFAULT); testOptionalFieldWithoutDefault(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.BOOLEAN_SCHEMA)); } private void testOptionalFieldWithoutDefault( SchemaBuilder optionalFieldSchema ) { Schema origSchema = SchemaBuilder.struct().name("struct").field( "optionalField", optionalFieldSchema.optional().build() ).build(); Schema preProcessedSchema = converter.preProcessSchema(origSchema); Object preProcessedValue = converter.preProcessValue( new Struct(origSchema).put("optionalField", null), origSchema, preProcessedSchema ); assertEquals(new Struct(preProcessedSchema).put("optionalField", null), preProcessedValue); } @Test public void ignoreOnNullValue() { converter = new DataConverter(true, BehaviorOnNullValues.IGNORE); Loading