Loading src/main/java/io/confluent/connect/elasticsearch/Mapping.java +14 −0 Original line number Diff line number Diff line Loading @@ -38,6 +38,7 @@ import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConst import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.DOUBLE_TYPE; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.FLOAT_TYPE; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.INTEGER_TYPE; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.KEYWORD_TYPE; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.LONG_TYPE; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_VALUE; Loading Loading @@ -174,6 +175,9 @@ public class Mapping { ObjectNode obj = JsonNodeFactory.instance.objectNode(); obj.set("type", JsonNodeFactory.instance.textNode(type)); if (type.equals(TEXT_TYPE)) { addTextMapping(obj); } JsonNode defaultValueNode = null; if (defaultValue != null) { switch (type) { Loading Loading @@ -218,6 +222,16 @@ public class Mapping { return obj; } private static void addTextMapping(ObjectNode obj) { // Add additional mapping for indexing, per https://www.elastic.co/blog/strings-are-dead-long-live-strings ObjectNode keyword = JsonNodeFactory.instance.objectNode(); keyword.set("type", JsonNodeFactory.instance.textNode(KEYWORD_TYPE)); keyword.set("ignore_above", JsonNodeFactory.instance.numberNode(256)); ObjectNode fields = JsonNodeFactory.instance.objectNode(); fields.set("keyword", keyword); obj.set("fields", fields); } private static byte[] bytes(Object value) { final byte[] bytes; if (value instanceof ByteBuffer) { Loading src/test/java/io/confluent/connect/elasticsearch/MappingTest.java +31 −0 Original line number Diff line number Diff line Loading @@ -16,6 +16,9 @@ package io.confluent.connect.elasticsearch; import com.fasterxml.jackson.databind.node.NumericNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; import com.google.gson.JsonObject; import org.apache.kafka.connect.data.Date; Loading @@ -30,6 +33,11 @@ import org.elasticsearch.test.InternalTestCluster; import org.junit.Test; import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.KEYWORD_TYPE; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.TEXT_TYPE; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class MappingTest extends ElasticsearchSinkTestBase { Loading @@ -51,6 +59,29 @@ public class MappingTest extends ElasticsearchSinkTestBase { verifyMapping(schema, mapping); } @Test @SuppressWarnings("unchecked") public void testStringMappingForES6() throws Exception { ElasticsearchClient client = mock(ElasticsearchClient.class); when(client.getVersion()).thenReturn(ElasticsearchClient.Version.ES_V6); Schema schema = SchemaBuilder.struct().name("textRecord") .field("string", Schema.STRING_SCHEMA) .build(); ObjectNode mapping = (ObjectNode) Mapping.inferMapping(client, schema); ObjectNode properties = mapping.with("properties"); ObjectNode string = properties.with("string"); TextNode stringType = (TextNode) string.get("type"); ObjectNode fields = string.with("fields"); ObjectNode keyword = fields.with("keyword"); TextNode keywordType = (TextNode) keyword.get("type"); NumericNode ignoreAbove = (NumericNode) keyword.get("ignore_above"); assertEquals(TEXT_TYPE, stringType.asText()); assertEquals(KEYWORD_TYPE, keywordType.asText()); assertEquals(256, ignoreAbove.asInt()); } protected Schema createSchema() { Schema structSchema = createInnerSchema(); return SchemaBuilder.struct().name("record") Loading Loading
src/main/java/io/confluent/connect/elasticsearch/Mapping.java +14 −0 Original line number Diff line number Diff line Loading @@ -38,6 +38,7 @@ import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConst import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.DOUBLE_TYPE; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.FLOAT_TYPE; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.INTEGER_TYPE; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.KEYWORD_TYPE; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.LONG_TYPE; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_VALUE; Loading Loading @@ -174,6 +175,9 @@ public class Mapping { ObjectNode obj = JsonNodeFactory.instance.objectNode(); obj.set("type", JsonNodeFactory.instance.textNode(type)); if (type.equals(TEXT_TYPE)) { addTextMapping(obj); } JsonNode defaultValueNode = null; if (defaultValue != null) { switch (type) { Loading Loading @@ -218,6 +222,16 @@ public class Mapping { return obj; } private static void addTextMapping(ObjectNode obj) { // Add additional mapping for indexing, per https://www.elastic.co/blog/strings-are-dead-long-live-strings ObjectNode keyword = JsonNodeFactory.instance.objectNode(); keyword.set("type", JsonNodeFactory.instance.textNode(KEYWORD_TYPE)); keyword.set("ignore_above", JsonNodeFactory.instance.numberNode(256)); ObjectNode fields = JsonNodeFactory.instance.objectNode(); fields.set("keyword", keyword); obj.set("fields", fields); } private static byte[] bytes(Object value) { final byte[] bytes; if (value instanceof ByteBuffer) { Loading
src/test/java/io/confluent/connect/elasticsearch/MappingTest.java +31 −0 Original line number Diff line number Diff line Loading @@ -16,6 +16,9 @@ package io.confluent.connect.elasticsearch; import com.fasterxml.jackson.databind.node.NumericNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; import com.google.gson.JsonObject; import org.apache.kafka.connect.data.Date; Loading @@ -30,6 +33,11 @@ import org.elasticsearch.test.InternalTestCluster; import org.junit.Test; import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.KEYWORD_TYPE; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.TEXT_TYPE; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class MappingTest extends ElasticsearchSinkTestBase { Loading @@ -51,6 +59,29 @@ public class MappingTest extends ElasticsearchSinkTestBase { verifyMapping(schema, mapping); } @Test @SuppressWarnings("unchecked") public void testStringMappingForES6() throws Exception { ElasticsearchClient client = mock(ElasticsearchClient.class); when(client.getVersion()).thenReturn(ElasticsearchClient.Version.ES_V6); Schema schema = SchemaBuilder.struct().name("textRecord") .field("string", Schema.STRING_SCHEMA) .build(); ObjectNode mapping = (ObjectNode) Mapping.inferMapping(client, schema); ObjectNode properties = mapping.with("properties"); ObjectNode string = properties.with("string"); TextNode stringType = (TextNode) string.get("type"); ObjectNode fields = string.with("fields"); ObjectNode keyword = fields.with("keyword"); TextNode keywordType = (TextNode) keyword.get("type"); NumericNode ignoreAbove = (NumericNode) keyword.get("ignore_above"); assertEquals(TEXT_TYPE, stringType.asText()); assertEquals(KEYWORD_TYPE, keywordType.asText()); assertEquals(256, ignoreAbove.asInt()); } protected Schema createSchema() { Schema structSchema = createInnerSchema(); return SchemaBuilder.struct().name("record") Loading