Commit 60b0c521 authored by Dustin Cote's avatar Dustin Cote Committed by GitHub
Browse files

Merge pull request #69 from cotedm/fixmapoutput

Fix map output for String keyed maps
parents 5f9c5047 ff0d3815
Loading
Loading
Loading
Loading
+21 −10
Original line number Diff line number Diff line
@@ -36,6 +36,8 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY;
@@ -130,6 +132,9 @@ public class DataConverter {
        Schema valueSchema = schema.valueSchema();
        String keyName = keySchema.name() == null ? keySchema.type().name() : keySchema.name();
        String valueName = valueSchema.name() == null ? valueSchema.type().name() : valueSchema.name();
        if (keySchema.type() == Schema.Type.STRING) {
          return SchemaBuilder.map(preProcessSchema(keySchema), preProcessSchema(valueSchema)).build();
        }
        Schema elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName)
             .field(MAP_KEY, preProcessSchema(keySchema))
             .field(MAP_VALUE, preProcessSchema(valueSchema))
@@ -189,22 +194,28 @@ public class DataConverter {
    }

    Schema.Type schemaType = schema.type();
    Schema keySchema;
    Schema valueSchema;
    switch (schemaType) {
      case ARRAY:
        Collection collection = (Collection) value;
        ArrayList<Object> result = new ArrayList<>();
        List<Object> result = new ArrayList<>();
        for (Object element: collection) {
          result.add(preProcessValue(element, schema.valueSchema(), newSchema.valueSchema()));
        }
        return result;
      case MAP:
        keySchema = schema.keySchema();
        valueSchema = schema.valueSchema();
        ArrayList<Struct> mapStructs = new ArrayList<>();
        Map<?, ?> map = (Map<?, ?>) value;
        Schema keySchema = schema.keySchema();
        Schema valueSchema = schema.valueSchema();
        Schema newValueSchema = newSchema.valueSchema();
        Map<?, ?> map = (Map<?, ?>) value;
        if (keySchema.type() == Schema.Type.STRING) {
          Map<Object, Object> processedMap = new HashMap<>();
          for (Map.Entry<?, ?> entry: map.entrySet()) {
            processedMap.put(preProcessValue(entry.getKey(), keySchema, newSchema.keySchema()),
                preProcessValue(entry.getValue(), valueSchema, newValueSchema));
          }
          return processedMap;
        }
        List<Struct> mapStructs = new ArrayList<>();
        for (Map.Entry<?, ?> entry: map.entrySet()) {
          Struct mapStruct = new Struct(newValueSchema);
          mapStruct.put(MAP_KEY, preProcessValue(entry.getKey(), keySchema, newValueSchema.field(MAP_KEY).schema()));
+29 −9
Original line number Diff line number Diff line
@@ -117,12 +117,12 @@ public class DataConverterTest {

  @Test
  public void map() {
    Schema origSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Decimal.schema(2)).build();
    Schema origSchema = SchemaBuilder.map(Schema.INT32_SCHEMA, Decimal.schema(2)).build();
    Schema preProcessedSchema = DataConverter.preProcessSchema(origSchema);
    assertEquals(
        SchemaBuilder.array(
            SchemaBuilder.struct().name(Schema.STRING_SCHEMA.type().name() + "-" + Decimal.LOGICAL_NAME)
                .field(ElasticsearchSinkConnectorConstants.MAP_KEY, Schema.STRING_SCHEMA)
            SchemaBuilder.struct().name(Schema.INT32_SCHEMA.type().name() + "-" + Decimal.LOGICAL_NAME)
                .field(ElasticsearchSinkConnectorConstants.MAP_KEY, Schema.INT32_SCHEMA)
                .field(ElasticsearchSinkConnectorConstants.MAP_VALUE, Schema.FLOAT64_SCHEMA)
                .build()
        ).build(),
@@ -130,15 +130,15 @@ public class DataConverterTest {
    );

    Map<Object, Object> origValue = new HashMap<>();
    origValue.put("field1", new BigDecimal("0.02"));
    origValue.put("field2", new BigDecimal("0.42"));
    origValue.put(1, new BigDecimal("0.02"));
    origValue.put(2, new BigDecimal("0.42"));
    assertEquals(
        new HashSet<>(Arrays.asList(
            new Struct(preProcessedSchema.valueSchema())
                .put(ElasticsearchSinkConnectorConstants.MAP_KEY, "field1")
                .put(ElasticsearchSinkConnectorConstants.MAP_KEY, 1)
                .put(ElasticsearchSinkConnectorConstants.MAP_VALUE, 0.02),
            new Struct(preProcessedSchema.valueSchema())
                .put(ElasticsearchSinkConnectorConstants.MAP_KEY, "field2")
                .put(ElasticsearchSinkConnectorConstants.MAP_KEY, 2)
                .put(ElasticsearchSinkConnectorConstants.MAP_VALUE, 0.42)
        )),
        new HashSet<>((List) DataConverter.preProcessValue(origValue, origSchema, preProcessedSchema))
@@ -147,16 +147,36 @@ public class DataConverterTest {
    // optional
    assertEquals(
        SchemaBuilder.array(preProcessedSchema.valueSchema()).optional().build(),
        DataConverter.preProcessSchema(SchemaBuilder.map(Schema.STRING_SCHEMA, Decimal.schema(2)).optional().build())
        DataConverter.preProcessSchema(SchemaBuilder.map(Schema.INT32_SCHEMA, Decimal.schema(2)).optional().build())
    );

    // defval
    assertEquals(
        SchemaBuilder.array(preProcessedSchema.valueSchema()).defaultValue(Collections.emptyList()).build(),
        DataConverter.preProcessSchema(SchemaBuilder.map(Schema.STRING_SCHEMA, Decimal.schema(2)).defaultValue(Collections.emptyMap()).build())
        DataConverter.preProcessSchema(SchemaBuilder.map(Schema.INT32_SCHEMA, Decimal.schema(2)).defaultValue(Collections.emptyMap()).build())
    );
  }

  @Test
  public void stringKeyedMap() {
    Schema origSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build();
    Schema preProcessedSchema = DataConverter.preProcessSchema(origSchema);
    assertEquals(
            SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(),
            preProcessedSchema
    );

    Map<Object, Object> origValue = new HashMap<>();
    origValue.put("field1", 1);
    origValue.put("field2", 2);
    HashMap newValue = (HashMap) DataConverter.preProcessValue(origValue, origSchema, preProcessedSchema);
    assertEquals(
            origValue,
            newValue
    );

  }

  @Test
  public void struct() {
    Schema origSchema = SchemaBuilder.struct().name("struct").field("decimal", Decimal.schema(2)).build();
+8 −4
Original line number Diff line number Diff line
@@ -113,7 +113,7 @@ public class ElasticsearchSinkTestBase extends ESIntegTestCase {
    verifySearchResults(records, TOPIC, ignoreKey, ignoreSchema);
  }

  protected void verifySearchResults(Collection<SinkRecord> records, String index, boolean ignoreKey, boolean ignoreSchema) throws IOException {
  protected void verifySearchResults(Collection<?> records, String index, boolean ignoreKey, boolean ignoreSchema) throws IOException {
    final SearchResult result = client.execute(new Search.Builder("").addIndex(index).build());

    final JsonArray rawHits = result.getJsonObject().getAsJsonObject("hits").getAsJsonArray("hits");
@@ -128,9 +128,13 @@ public class ElasticsearchSinkTestBase extends ESIntegTestCase {
      hits.put(id, source);
    }

    for (SinkRecord record : records) {
      final IndexableRecord indexableRecord = DataConverter.convertRecord(record, index, TYPE, ignoreKey, ignoreSchema);
    for (Object record : records) {
      if (record instanceof SinkRecord) {
        IndexableRecord indexableRecord = DataConverter.convertRecord((SinkRecord) record, index, TYPE, ignoreKey, ignoreSchema);
        assertEquals(indexableRecord.payload, hits.get(indexableRecord.key.id));
      } else {
        assertEquals(record, hits.get("key"));
      }
    }
  }

+21 −0
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@
package io.confluent.connect.elasticsearch;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
@@ -231,6 +232,26 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
    verifySearchResults(records, ignoreKey, ignoreSchema);
  }

  @Test
  public void testStringKeyedMap() throws Exception {
    boolean ignoreKey = false;
    boolean ignoreSchema = false;

    Schema mapSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build();

    Map<String, Integer> map = new HashMap<>();
    map.put("One", 1);
    map.put("Two", 2);

    SinkRecord sinkRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, mapSchema, map, 0);

    ElasticsearchWriter writer = initWriter(client, ignoreKey, ignoreSchema);
    writeDataAndRefresh(writer, Collections.singletonList(sinkRecord));

    Collection<?> expectedRecords = Collections.singletonList(new ObjectMapper().writeValueAsString(map));
    verifySearchResults(expectedRecords, TOPIC, ignoreKey, ignoreSchema);
  }

  @Test
  public void testDecimal() throws Exception {
    final boolean ignoreKey = false;
+3 −4
Original line number Diff line number Diff line
@@ -29,9 +29,6 @@ import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.junit.Test;

import io.searchbox.client.JestResult;
import io.searchbox.indices.mapping.GetMapping;

public class MappingTest extends ElasticsearchSinkTestBase {

  private static final String INDEX = "kafka-connect";
@@ -119,7 +116,9 @@ public class MappingTest extends ElasticsearchSinkTestBase {
        break;
      case MAP:
        Schema newSchema = DataConverter.preProcessSchema(schema);
        verifyMapping(newSchema, mapping);
        JsonObject mapProperties = mapping.get("properties").getAsJsonObject();
        verifyMapping(newSchema.keySchema(), mapProperties.get(ElasticsearchSinkConnectorConstants.MAP_KEY).getAsJsonObject());
        verifyMapping(newSchema.valueSchema(), mapProperties.get(ElasticsearchSinkConnectorConstants.MAP_VALUE).getAsJsonObject());
        break;
      case STRUCT:
        JsonObject properties = mapping.get("properties").getAsJsonObject();