Loading src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +12 −15 Original line number Diff line number Diff line Loading @@ -131,11 +131,10 @@ public class DataConverter { Schema valueSchema = schema.valueSchema(); String keyName = keySchema.name() == null ? keySchema.type().name() : keySchema.name(); String valueName = valueSchema.name() == null ? valueSchema.type().name() : valueSchema.name(); Schema elementSchema; if (keySchema.type() == Schema.Type.STRING) { return SchemaBuilder.map(preProcessSchema(keySchema), preProcessSchema(valueSchema)).build(); } elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName) Schema elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName) .field(MAP_KEY, preProcessSchema(keySchema)) .field(MAP_VALUE, preProcessSchema(valueSchema)) .build(); Loading Loading @@ -194,8 +193,6 @@ public class DataConverter { } Schema.Type schemaType = schema.type(); Schema keySchema; Schema valueSchema; switch (schemaType) { case ARRAY: Collection collection = (Collection) value; Loading @@ -205,19 +202,19 @@ public class DataConverter { } return result; case MAP: keySchema = schema.keySchema(); valueSchema = schema.valueSchema(); ArrayList<Struct> mapStructs = new ArrayList<>(); Map<?, ?> map = (Map<?, ?>) value; Schema keySchema = schema.keySchema(); Schema newValueSchema = newSchema.valueSchema(); Schema valueSchema = schema.valueSchema(); Map<?, ?> map = (Map<?, ?>) value; if (keySchema.type() == Schema.Type.STRING) { Map<Object, Object> toReturn = new HashMap<>(); Map<Object, Object> processedMap = new HashMap<>(); for (Map.Entry<?, ?> entry: map.entrySet()) { toReturn.put(preProcessValue(entry.getKey(), keySchema, newSchema.keySchema()), preProcessValue(entry.getValue(), valueSchema, newSchema.valueSchema())); processedMap.put(preProcessValue(entry.getKey(), keySchema, newSchema.keySchema()), preProcessValue(entry.getValue(), valueSchema, newValueSchema)); } return toReturn; return processedMap; } ArrayList<Struct> mapStructs = new ArrayList<>(); for (Map.Entry<?, ?> entry: map.entrySet()) { Struct mapStruct = new Struct(newValueSchema); mapStruct.put(MAP_KEY, preProcessValue(entry.getKey(), keySchema, newValueSchema.field(MAP_KEY).schema())); Loading src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTestBase.java +8 −4 Original line number Diff line number Diff line Loading @@ -113,7 +113,7 @@ public class ElasticsearchSinkTestBase extends ESIntegTestCase { verifySearchResults(records, TOPIC, ignoreKey, ignoreSchema); } protected void verifySearchResults(Collection<SinkRecord> records, String index, boolean ignoreKey, boolean ignoreSchema) throws IOException { protected void verifySearchResults(Collection<?> records, String index, boolean ignoreKey, boolean ignoreSchema) throws IOException { final SearchResult result = client.execute(new Search.Builder("").addIndex(index).build()); final JsonArray rawHits = result.getJsonObject().getAsJsonObject("hits").getAsJsonArray("hits"); Loading @@ -128,9 +128,13 @@ public class ElasticsearchSinkTestBase extends ESIntegTestCase { hits.put(id, source); } for (SinkRecord record : records) { final IndexableRecord indexableRecord = DataConverter.convertRecord(record, index, TYPE, ignoreKey, ignoreSchema); for (Object record : records) { if (record instanceof SinkRecord) { IndexableRecord indexableRecord = DataConverter.convertRecord((SinkRecord) record, index, TYPE, ignoreKey, ignoreSchema); assertEquals(indexableRecord.payload, hits.get(indexableRecord.key.id)); } else { assertEquals(record, hits.get("key")); } } } Loading src/test/java/io/confluent/connect/elasticsearch/ElasticsearchWriterTest.java +21 −0 Original line number Diff line number Diff line Loading @@ -17,6 +17,7 @@ package io.confluent.connect.elasticsearch; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; Loading Loading @@ -231,6 +232,26 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase { verifySearchResults(records, ignoreKey, ignoreSchema); } @Test public void testStringKeyedMap() throws Exception { boolean ignoreKey = false; boolean ignoreSchema = false; Schema mapSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(); Map<String, Integer> map = new HashMap<>(); map.put("One", 1); map.put("Two", 2); SinkRecord sinkRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, mapSchema, map, 0); ElasticsearchWriter writer = initWriter(client, ignoreKey, ignoreSchema); writeDataAndRefresh(writer, Collections.singletonList(sinkRecord)); Collection<?> expectedRecords = Collections.singletonList(new ObjectMapper().writeValueAsString(map)); verifySearchResults(expectedRecords, TOPIC, ignoreKey, ignoreSchema); } @Test public void testDecimal() throws Exception { final boolean ignoreKey = false; Loading src/test/java/io/confluent/connect/elasticsearch/MappingTest.java +3 −4 Original line number Diff line number Diff line Loading @@ -29,9 +29,6 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.junit.Test; import io.searchbox.client.JestResult; import io.searchbox.indices.mapping.GetMapping; public class MappingTest extends ElasticsearchSinkTestBase { private static final String INDEX = "kafka-connect"; Loading Loading @@ -119,7 +116,9 @@ public class MappingTest extends ElasticsearchSinkTestBase { break; case MAP: Schema newSchema = DataConverter.preProcessSchema(schema); verifyMapping(newSchema, mapping); JsonObject mapProperties = mapping.get("properties").getAsJsonObject(); verifyMapping(newSchema.keySchema(), mapProperties.get(ElasticsearchSinkConnectorConstants.MAP_KEY).getAsJsonObject()); verifyMapping(newSchema.valueSchema(), mapProperties.get(ElasticsearchSinkConnectorConstants.MAP_VALUE).getAsJsonObject()); break; case STRUCT: JsonObject properties = mapping.get("properties").getAsJsonObject(); Loading Loading
src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +12 −15 Original line number Diff line number Diff line Loading @@ -131,11 +131,10 @@ public class DataConverter { Schema valueSchema = schema.valueSchema(); String keyName = keySchema.name() == null ? keySchema.type().name() : keySchema.name(); String valueName = valueSchema.name() == null ? valueSchema.type().name() : valueSchema.name(); Schema elementSchema; if (keySchema.type() == Schema.Type.STRING) { return SchemaBuilder.map(preProcessSchema(keySchema), preProcessSchema(valueSchema)).build(); } elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName) Schema elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName) .field(MAP_KEY, preProcessSchema(keySchema)) .field(MAP_VALUE, preProcessSchema(valueSchema)) .build(); Loading Loading @@ -194,8 +193,6 @@ public class DataConverter { } Schema.Type schemaType = schema.type(); Schema keySchema; Schema valueSchema; switch (schemaType) { case ARRAY: Collection collection = (Collection) value; Loading @@ -205,19 +202,19 @@ public class DataConverter { } return result; case MAP: keySchema = schema.keySchema(); valueSchema = schema.valueSchema(); ArrayList<Struct> mapStructs = new ArrayList<>(); Map<?, ?> map = (Map<?, ?>) value; Schema keySchema = schema.keySchema(); Schema newValueSchema = newSchema.valueSchema(); Schema valueSchema = schema.valueSchema(); Map<?, ?> map = (Map<?, ?>) value; if (keySchema.type() == Schema.Type.STRING) { Map<Object, Object> toReturn = new HashMap<>(); Map<Object, Object> processedMap = new HashMap<>(); for (Map.Entry<?, ?> entry: map.entrySet()) { toReturn.put(preProcessValue(entry.getKey(), keySchema, newSchema.keySchema()), preProcessValue(entry.getValue(), valueSchema, newSchema.valueSchema())); processedMap.put(preProcessValue(entry.getKey(), keySchema, newSchema.keySchema()), preProcessValue(entry.getValue(), valueSchema, newValueSchema)); } return toReturn; return processedMap; } ArrayList<Struct> mapStructs = new ArrayList<>(); for (Map.Entry<?, ?> entry: map.entrySet()) { Struct mapStruct = new Struct(newValueSchema); mapStruct.put(MAP_KEY, preProcessValue(entry.getKey(), keySchema, newValueSchema.field(MAP_KEY).schema())); Loading
src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTestBase.java +8 −4 Original line number Diff line number Diff line Loading @@ -113,7 +113,7 @@ public class ElasticsearchSinkTestBase extends ESIntegTestCase { verifySearchResults(records, TOPIC, ignoreKey, ignoreSchema); } protected void verifySearchResults(Collection<SinkRecord> records, String index, boolean ignoreKey, boolean ignoreSchema) throws IOException { protected void verifySearchResults(Collection<?> records, String index, boolean ignoreKey, boolean ignoreSchema) throws IOException { final SearchResult result = client.execute(new Search.Builder("").addIndex(index).build()); final JsonArray rawHits = result.getJsonObject().getAsJsonObject("hits").getAsJsonArray("hits"); Loading @@ -128,9 +128,13 @@ public class ElasticsearchSinkTestBase extends ESIntegTestCase { hits.put(id, source); } for (SinkRecord record : records) { final IndexableRecord indexableRecord = DataConverter.convertRecord(record, index, TYPE, ignoreKey, ignoreSchema); for (Object record : records) { if (record instanceof SinkRecord) { IndexableRecord indexableRecord = DataConverter.convertRecord((SinkRecord) record, index, TYPE, ignoreKey, ignoreSchema); assertEquals(indexableRecord.payload, hits.get(indexableRecord.key.id)); } else { assertEquals(record, hits.get("key")); } } } Loading
src/test/java/io/confluent/connect/elasticsearch/ElasticsearchWriterTest.java +21 −0 Original line number Diff line number Diff line Loading @@ -17,6 +17,7 @@ package io.confluent.connect.elasticsearch; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; Loading Loading @@ -231,6 +232,26 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase { verifySearchResults(records, ignoreKey, ignoreSchema); } @Test public void testStringKeyedMap() throws Exception { boolean ignoreKey = false; boolean ignoreSchema = false; Schema mapSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(); Map<String, Integer> map = new HashMap<>(); map.put("One", 1); map.put("Two", 2); SinkRecord sinkRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, mapSchema, map, 0); ElasticsearchWriter writer = initWriter(client, ignoreKey, ignoreSchema); writeDataAndRefresh(writer, Collections.singletonList(sinkRecord)); Collection<?> expectedRecords = Collections.singletonList(new ObjectMapper().writeValueAsString(map)); verifySearchResults(expectedRecords, TOPIC, ignoreKey, ignoreSchema); } @Test public void testDecimal() throws Exception { final boolean ignoreKey = false; Loading
src/test/java/io/confluent/connect/elasticsearch/MappingTest.java +3 −4 Original line number Diff line number Diff line Loading @@ -29,9 +29,6 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.junit.Test; import io.searchbox.client.JestResult; import io.searchbox.indices.mapping.GetMapping; public class MappingTest extends ElasticsearchSinkTestBase { private static final String INDEX = "kafka-connect"; Loading Loading @@ -119,7 +116,9 @@ public class MappingTest extends ElasticsearchSinkTestBase { break; case MAP: Schema newSchema = DataConverter.preProcessSchema(schema); verifyMapping(newSchema, mapping); JsonObject mapProperties = mapping.get("properties").getAsJsonObject(); verifyMapping(newSchema.keySchema(), mapProperties.get(ElasticsearchSinkConnectorConstants.MAP_KEY).getAsJsonObject()); verifyMapping(newSchema.valueSchema(), mapProperties.get(ElasticsearchSinkConnectorConstants.MAP_VALUE).getAsJsonObject()); break; case STRUCT: JsonObject properties = mapping.get("properties").getAsJsonObject(); Loading