Commit 8201b3a2 authored by Robert Yokota's avatar Robert Yokota
Browse files

CC-2037 Use keyword subfield with string fields

parent abf31091
Loading
Loading
Loading
Loading
+14 −0
Original line number Diff line number Diff line
@@ -38,6 +38,7 @@ import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConst
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.DOUBLE_TYPE;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.FLOAT_TYPE;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.INTEGER_TYPE;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.KEYWORD_TYPE;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.LONG_TYPE;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_VALUE;
@@ -174,6 +175,9 @@ public class Mapping {

    ObjectNode obj = JsonNodeFactory.instance.objectNode();
    obj.set("type", JsonNodeFactory.instance.textNode(type));
    if (type.equals(TEXT_TYPE)) {
      addTextMapping(obj);
    }
    JsonNode defaultValueNode = null;
    if (defaultValue != null) {
      switch (type) {
@@ -218,6 +222,16 @@ public class Mapping {
    return obj;
  }

  private static void addTextMapping(ObjectNode obj) {
    // Add additional mapping for indexing, per https://www.elastic.co/blog/strings-are-dead-long-live-strings
    ObjectNode keyword = JsonNodeFactory.instance.objectNode();
    keyword.set("type", JsonNodeFactory.instance.textNode(KEYWORD_TYPE));
    keyword.set("ignore_above", JsonNodeFactory.instance.numberNode(256));
    ObjectNode fields = JsonNodeFactory.instance.objectNode();
    fields.set("keyword", keyword);
    obj.set("fields", fields);
  }

  private static byte[] bytes(Object value) {
    final byte[] bytes;
    if (value instanceof ByteBuffer) {
+31 −0
Original line number Diff line number Diff line
@@ -16,6 +16,9 @@

package io.confluent.connect.elasticsearch;

import com.fasterxml.jackson.databind.node.NumericNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.gson.JsonObject;

import org.apache.kafka.connect.data.Date;
@@ -30,6 +33,11 @@ import org.elasticsearch.test.InternalTestCluster;
import org.junit.Test;

import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.KEYWORD_TYPE;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.TEXT_TYPE;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class MappingTest extends ElasticsearchSinkTestBase {

@@ -51,6 +59,29 @@ public class MappingTest extends ElasticsearchSinkTestBase {
    verifyMapping(schema, mapping);
  }

  @Test
  @SuppressWarnings("unchecked")
  public void testStringMappingForES6() throws Exception {
    ElasticsearchClient client = mock(ElasticsearchClient.class);
    when(client.getVersion()).thenReturn(ElasticsearchClient.Version.ES_V6);

    Schema schema = SchemaBuilder.struct().name("textRecord")
            .field("string", Schema.STRING_SCHEMA)
            .build();
    ObjectNode mapping = (ObjectNode) Mapping.inferMapping(client, schema);
    ObjectNode properties = mapping.with("properties");
    ObjectNode string = properties.with("string");
    TextNode stringType = (TextNode) string.get("type");
    ObjectNode fields = string.with("fields");
    ObjectNode keyword = fields.with("keyword");
    TextNode keywordType = (TextNode) keyword.get("type");
    NumericNode ignoreAbove = (NumericNode) keyword.get("ignore_above");

    assertEquals(TEXT_TYPE, stringType.asText());
    assertEquals(KEYWORD_TYPE, keywordType.asText());
    assertEquals(256, ignoreAbove.asInt());
  }

  protected Schema createSchema() {
    Schema structSchema = createInnerSchema();
    return SchemaBuilder.struct().name("record")