Loading src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +17 −4 Original line number Diff line number Diff line Loading @@ -36,6 +36,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY; Loading Loading @@ -130,7 +131,11 @@ public class DataConverter { Schema valueSchema = schema.valueSchema(); String keyName = keySchema.name() == null ? keySchema.type().name() : keySchema.name(); String valueName = valueSchema.name() == null ? valueSchema.type().name() : valueSchema.name(); Schema elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName) Schema elementSchema; if (keySchema.type() == Schema.Type.STRING) { return SchemaBuilder.map(preProcessSchema(keySchema), preProcessSchema(valueSchema)).build(); } elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName) .field(MAP_KEY, preProcessSchema(keySchema)) .field(MAP_VALUE, preProcessSchema(valueSchema)) .build(); Loading Loading @@ -205,6 +210,14 @@ public class DataConverter { ArrayList<Struct> mapStructs = new ArrayList<>(); Map<?, ?> map = (Map<?, ?>) value; Schema newValueSchema = newSchema.valueSchema(); if (keySchema.type() == Schema.Type.STRING) { Map<Object, Object> toReturn = new HashMap<>(); for (Map.Entry<?, ?> entry: map.entrySet()) { toReturn.put(preProcessValue(entry.getKey(), keySchema, newSchema.keySchema()), preProcessValue(entry.getValue(), valueSchema, newSchema.valueSchema())); } return toReturn; } for (Map.Entry<?, ?> entry: map.entrySet()) { Struct mapStruct = new Struct(newValueSchema); mapStruct.put(MAP_KEY, preProcessValue(entry.getKey(), keySchema, newValueSchema.field(MAP_KEY).schema())); Loading src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java +29 −9 Original line number Diff line number Diff line Loading @@ -117,12 +117,12 @@ public class DataConverterTest { @Test public void map() { Schema origSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Decimal.schema(2)).build(); Schema origSchema = SchemaBuilder.map(Schema.INT32_SCHEMA, Decimal.schema(2)).build(); Schema preProcessedSchema = DataConverter.preProcessSchema(origSchema); assertEquals( SchemaBuilder.array( SchemaBuilder.struct().name(Schema.STRING_SCHEMA.type().name() + "-" + Decimal.LOGICAL_NAME) .field(ElasticsearchSinkConnectorConstants.MAP_KEY, Schema.STRING_SCHEMA) SchemaBuilder.struct().name(Schema.INT32_SCHEMA.type().name() + "-" + Decimal.LOGICAL_NAME) .field(ElasticsearchSinkConnectorConstants.MAP_KEY, Schema.INT32_SCHEMA) .field(ElasticsearchSinkConnectorConstants.MAP_VALUE, Schema.FLOAT64_SCHEMA) .build() ).build(), Loading @@ -130,15 +130,15 @@ public class DataConverterTest { ); Map<Object, Object> origValue = new HashMap<>(); origValue.put("field1", new BigDecimal("0.02")); origValue.put("field2", new BigDecimal("0.42")); origValue.put(1, new BigDecimal("0.02")); origValue.put(2, new BigDecimal("0.42")); assertEquals( new HashSet<>(Arrays.asList( new Struct(preProcessedSchema.valueSchema()) .put(ElasticsearchSinkConnectorConstants.MAP_KEY, "field1") .put(ElasticsearchSinkConnectorConstants.MAP_KEY, 1) .put(ElasticsearchSinkConnectorConstants.MAP_VALUE, 0.02), new Struct(preProcessedSchema.valueSchema()) .put(ElasticsearchSinkConnectorConstants.MAP_KEY, "field2") .put(ElasticsearchSinkConnectorConstants.MAP_KEY, 2) .put(ElasticsearchSinkConnectorConstants.MAP_VALUE, 0.42) )), new HashSet<>((List) DataConverter.preProcessValue(origValue, origSchema, preProcessedSchema)) Loading @@ -147,16 +147,36 @@ public class DataConverterTest { // optional assertEquals( SchemaBuilder.array(preProcessedSchema.valueSchema()).optional().build(), DataConverter.preProcessSchema(SchemaBuilder.map(Schema.STRING_SCHEMA, Decimal.schema(2)).optional().build()) DataConverter.preProcessSchema(SchemaBuilder.map(Schema.INT32_SCHEMA, Decimal.schema(2)).optional().build()) ); // defval assertEquals( SchemaBuilder.array(preProcessedSchema.valueSchema()).defaultValue(Collections.emptyList()).build(), DataConverter.preProcessSchema(SchemaBuilder.map(Schema.STRING_SCHEMA, Decimal.schema(2)).defaultValue(Collections.emptyMap()).build()) DataConverter.preProcessSchema(SchemaBuilder.map(Schema.INT32_SCHEMA, Decimal.schema(2)).defaultValue(Collections.emptyMap()).build()) ); } @Test public void stringKeyedMap() { Schema origSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(); Schema preProcessedSchema = DataConverter.preProcessSchema(origSchema); assertEquals( SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(), preProcessedSchema ); Map<Object, Object> origValue = new HashMap<>(); origValue.put("field1", 1); origValue.put("field2", 2); HashMap newValue = (HashMap) DataConverter.preProcessValue(origValue, origSchema, preProcessedSchema); assertEquals( origValue, newValue ); } @Test public void struct() { Schema origSchema = SchemaBuilder.struct().name("struct").field("decimal", Decimal.schema(2)).build(); Loading Loading
src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +17 −4 Original line number Diff line number Diff line Loading @@ -36,6 +36,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY; Loading Loading @@ -130,7 +131,11 @@ public class DataConverter { Schema valueSchema = schema.valueSchema(); String keyName = keySchema.name() == null ? keySchema.type().name() : keySchema.name(); String valueName = valueSchema.name() == null ? valueSchema.type().name() : valueSchema.name(); Schema elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName) Schema elementSchema; if (keySchema.type() == Schema.Type.STRING) { return SchemaBuilder.map(preProcessSchema(keySchema), preProcessSchema(valueSchema)).build(); } elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName) .field(MAP_KEY, preProcessSchema(keySchema)) .field(MAP_VALUE, preProcessSchema(valueSchema)) .build(); Loading Loading @@ -205,6 +210,14 @@ public class DataConverter { ArrayList<Struct> mapStructs = new ArrayList<>(); Map<?, ?> map = (Map<?, ?>) value; Schema newValueSchema = newSchema.valueSchema(); if (keySchema.type() == Schema.Type.STRING) { Map<Object, Object> toReturn = new HashMap<>(); for (Map.Entry<?, ?> entry: map.entrySet()) { toReturn.put(preProcessValue(entry.getKey(), keySchema, newSchema.keySchema()), preProcessValue(entry.getValue(), valueSchema, newSchema.valueSchema())); } return toReturn; } for (Map.Entry<?, ?> entry: map.entrySet()) { Struct mapStruct = new Struct(newValueSchema); mapStruct.put(MAP_KEY, preProcessValue(entry.getKey(), keySchema, newValueSchema.field(MAP_KEY).schema())); Loading
src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java +29 −9 Original line number Diff line number Diff line Loading @@ -117,12 +117,12 @@ public class DataConverterTest { @Test public void map() { Schema origSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Decimal.schema(2)).build(); Schema origSchema = SchemaBuilder.map(Schema.INT32_SCHEMA, Decimal.schema(2)).build(); Schema preProcessedSchema = DataConverter.preProcessSchema(origSchema); assertEquals( SchemaBuilder.array( SchemaBuilder.struct().name(Schema.STRING_SCHEMA.type().name() + "-" + Decimal.LOGICAL_NAME) .field(ElasticsearchSinkConnectorConstants.MAP_KEY, Schema.STRING_SCHEMA) SchemaBuilder.struct().name(Schema.INT32_SCHEMA.type().name() + "-" + Decimal.LOGICAL_NAME) .field(ElasticsearchSinkConnectorConstants.MAP_KEY, Schema.INT32_SCHEMA) .field(ElasticsearchSinkConnectorConstants.MAP_VALUE, Schema.FLOAT64_SCHEMA) .build() ).build(), Loading @@ -130,15 +130,15 @@ public class DataConverterTest { ); Map<Object, Object> origValue = new HashMap<>(); origValue.put("field1", new BigDecimal("0.02")); origValue.put("field2", new BigDecimal("0.42")); origValue.put(1, new BigDecimal("0.02")); origValue.put(2, new BigDecimal("0.42")); assertEquals( new HashSet<>(Arrays.asList( new Struct(preProcessedSchema.valueSchema()) .put(ElasticsearchSinkConnectorConstants.MAP_KEY, "field1") .put(ElasticsearchSinkConnectorConstants.MAP_KEY, 1) .put(ElasticsearchSinkConnectorConstants.MAP_VALUE, 0.02), new Struct(preProcessedSchema.valueSchema()) .put(ElasticsearchSinkConnectorConstants.MAP_KEY, "field2") .put(ElasticsearchSinkConnectorConstants.MAP_KEY, 2) .put(ElasticsearchSinkConnectorConstants.MAP_VALUE, 0.42) )), new HashSet<>((List) DataConverter.preProcessValue(origValue, origSchema, preProcessedSchema)) Loading @@ -147,16 +147,36 @@ public class DataConverterTest { // optional assertEquals( SchemaBuilder.array(preProcessedSchema.valueSchema()).optional().build(), DataConverter.preProcessSchema(SchemaBuilder.map(Schema.STRING_SCHEMA, Decimal.schema(2)).optional().build()) DataConverter.preProcessSchema(SchemaBuilder.map(Schema.INT32_SCHEMA, Decimal.schema(2)).optional().build()) ); // defval assertEquals( SchemaBuilder.array(preProcessedSchema.valueSchema()).defaultValue(Collections.emptyList()).build(), DataConverter.preProcessSchema(SchemaBuilder.map(Schema.STRING_SCHEMA, Decimal.schema(2)).defaultValue(Collections.emptyMap()).build()) DataConverter.preProcessSchema(SchemaBuilder.map(Schema.INT32_SCHEMA, Decimal.schema(2)).defaultValue(Collections.emptyMap()).build()) ); } @Test public void stringKeyedMap() { Schema origSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(); Schema preProcessedSchema = DataConverter.preProcessSchema(origSchema); assertEquals( SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(), preProcessedSchema ); Map<Object, Object> origValue = new HashMap<>(); origValue.put("field1", 1); origValue.put("field2", 2); HashMap newValue = (HashMap) DataConverter.preProcessValue(origValue, origSchema, preProcessedSchema); assertEquals( origValue, newValue ); } @Test public void struct() { Schema origSchema = SchemaBuilder.struct().name("struct").field("decimal", Decimal.schema(2)).build(); Loading