Loading src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConstants.java +2 −1 Original line number Diff line number Diff line Loading @@ -27,6 +27,7 @@ public class ElasticsearchSinkConnectorConstants { public static final String BOOLEAN_TYPE = "boolean"; public static final String BYTE_TYPE = "byte"; public static final String BINARY_TYPE = "binary"; public static final String SHORT_TYPE = "short"; public static final String INTEGER_TYPE = "integer"; public static final String LONG_TYPE = "long"; Loading @@ -46,6 +47,6 @@ public class ElasticsearchSinkConnectorConstants { TYPES.put(Type.FLOAT32, FLOAT_TYPE); TYPES.put(Type.FLOAT64, DOUBLE_TYPE); TYPES.put(Type.STRING, STRING_TYPE); TYPES.put(Type.BYTES, BYTE_TYPE); TYPES.put(Type.BYTES, BINARY_TYPE); } } src/main/java/io/confluent/connect/elasticsearch/Mapping.java +17 −0 Original line number Diff line number Diff line Loading @@ -34,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; import io.searchbox.client.JestClient; import io.searchbox.client.JestResult; Loading Loading @@ -164,6 +165,9 @@ public class Mapping { case ElasticsearchSinkConnectorConstants.STRING_TYPE: defaultValueNode = JsonNodeFactory.instance.textNode((String) defaultValue); break; case ElasticsearchSinkConnectorConstants.BINARY_TYPE: defaultValueNode = JsonNodeFactory.instance.binaryNode(bytes(defaultValue)); break; case ElasticsearchSinkConnectorConstants.BOOLEAN_TYPE: defaultValueNode = JsonNodeFactory.instance.booleanNode((boolean) defaultValue); break; Loading @@ -179,4 +183,17 @@ public class Mapping { } return obj; } private static byte[] bytes(Object value) { final byte[] bytes; if (value instanceof ByteBuffer) { final ByteBuffer buffer = ((ByteBuffer) value).slice(); bytes = new byte[buffer.remaining()]; buffer.get(bytes); } else { bytes = (byte[]) value; } return bytes; } } src/test/java/io/confluent/connect/elasticsearch/ElasticsearchWriterTest.java +21 −0 Original line number Diff line number Diff line Loading @@ -256,6 +256,27 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase { verifySearchResults(records, ignoreKey, ignoreSchema); } @Test public void testBytes() throws Exception { final boolean ignoreKey = false; final boolean ignoreSchema = false; Schema structSchema = SchemaBuilder.struct().name("struct") .field("bytes", SchemaBuilder.BYTES_SCHEMA) .build(); Struct struct = new Struct(structSchema); struct.put("bytes", new byte[]{42}); Collection<SinkRecord> records = new ArrayList<>(); SinkRecord sinkRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, structSchema, struct, 0); records.add(sinkRecord); ElasticsearchWriter writer = initWriter(client, ignoreKey, ignoreSchema); writeDataAndRefresh(writer, records); verifySearchResults(records, ignoreKey, ignoreSchema); } private Collection<SinkRecord> prepareData(int numRecords) { Collection<SinkRecord> records = new ArrayList<>(); for (int i = 0; i < numRecords; ++i) { Loading Loading
src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConstants.java +2 −1 Original line number Diff line number Diff line Loading @@ -27,6 +27,7 @@ public class ElasticsearchSinkConnectorConstants { public static final String BOOLEAN_TYPE = "boolean"; public static final String BYTE_TYPE = "byte"; public static final String BINARY_TYPE = "binary"; public static final String SHORT_TYPE = "short"; public static final String INTEGER_TYPE = "integer"; public static final String LONG_TYPE = "long"; Loading @@ -46,6 +47,6 @@ public class ElasticsearchSinkConnectorConstants { TYPES.put(Type.FLOAT32, FLOAT_TYPE); TYPES.put(Type.FLOAT64, DOUBLE_TYPE); TYPES.put(Type.STRING, STRING_TYPE); TYPES.put(Type.BYTES, BYTE_TYPE); TYPES.put(Type.BYTES, BINARY_TYPE); } }
src/main/java/io/confluent/connect/elasticsearch/Mapping.java +17 −0 Original line number Diff line number Diff line Loading @@ -34,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; import io.searchbox.client.JestClient; import io.searchbox.client.JestResult; Loading Loading @@ -164,6 +165,9 @@ public class Mapping { case ElasticsearchSinkConnectorConstants.STRING_TYPE: defaultValueNode = JsonNodeFactory.instance.textNode((String) defaultValue); break; case ElasticsearchSinkConnectorConstants.BINARY_TYPE: defaultValueNode = JsonNodeFactory.instance.binaryNode(bytes(defaultValue)); break; case ElasticsearchSinkConnectorConstants.BOOLEAN_TYPE: defaultValueNode = JsonNodeFactory.instance.booleanNode((boolean) defaultValue); break; Loading @@ -179,4 +183,17 @@ public class Mapping { } return obj; } private static byte[] bytes(Object value) { final byte[] bytes; if (value instanceof ByteBuffer) { final ByteBuffer buffer = ((ByteBuffer) value).slice(); bytes = new byte[buffer.remaining()]; buffer.get(bytes); } else { bytes = (byte[]) value; } return bytes; } }
src/test/java/io/confluent/connect/elasticsearch/ElasticsearchWriterTest.java +21 −0 Original line number Diff line number Diff line Loading @@ -256,6 +256,27 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase { verifySearchResults(records, ignoreKey, ignoreSchema); } @Test public void testBytes() throws Exception { final boolean ignoreKey = false; final boolean ignoreSchema = false; Schema structSchema = SchemaBuilder.struct().name("struct") .field("bytes", SchemaBuilder.BYTES_SCHEMA) .build(); Struct struct = new Struct(structSchema); struct.put("bytes", new byte[]{42}); Collection<SinkRecord> records = new ArrayList<>(); SinkRecord sinkRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, structSchema, struct, 0); records.add(sinkRecord); ElasticsearchWriter writer = initWriter(client, ignoreKey, ignoreSchema); writeDataAndRefresh(writer, records); verifySearchResults(records, ignoreKey, ignoreSchema); } private Collection<SinkRecord> prepareData(int numRecords) { Collection<SinkRecord> records = new ArrayList<>(); for (int i = 0; i < numRecords; ++i) { Loading