Commit 765b1dc2 authored by Shikhar Bhushan's avatar Shikhar Bhushan
Browse files

Schema.Type.BYTES should map to 'binary' ES datatype

parent 01ad3b44
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -27,6 +27,7 @@ public class ElasticsearchSinkConnectorConstants {

  public static final String BOOLEAN_TYPE = "boolean";
  public static final String BYTE_TYPE = "byte";
  public static final String BINARY_TYPE = "binary";
  public static final String SHORT_TYPE = "short";
  public static final String INTEGER_TYPE = "integer";
  public static final String LONG_TYPE = "long";
@@ -46,6 +47,6 @@ public class ElasticsearchSinkConnectorConstants {
    TYPES.put(Type.FLOAT32, FLOAT_TYPE);
    TYPES.put(Type.FLOAT64, DOUBLE_TYPE);
    TYPES.put(Type.STRING, STRING_TYPE);
    TYPES.put(Type.BYTES, BYTE_TYPE);
    TYPES.put(Type.BYTES, BINARY_TYPE);
  }
}
+17 −0
Original line number Diff line number Diff line
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;

import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
@@ -166,6 +167,9 @@ public class Mapping {
        case ElasticsearchSinkConnectorConstants.STRING_TYPE:
          defaultValueNode = JsonNodeFactory.instance.textNode((String) defaultValue);
          break;
        case ElasticsearchSinkConnectorConstants.BINARY_TYPE:
          defaultValueNode = JsonNodeFactory.instance.binaryNode(bytes(defaultValue));
          break;
        case ElasticsearchSinkConnectorConstants.BOOLEAN_TYPE:
          defaultValueNode = JsonNodeFactory.instance.booleanNode((boolean) defaultValue);
          break;
@@ -181,4 +185,17 @@ public class Mapping {
    }
    return obj;
  }

  private static byte[] bytes(Object value) {
    final byte[] bytes;
    if (value instanceof ByteBuffer) {
      final ByteBuffer buffer = ((ByteBuffer) value).slice();
      bytes = new byte[buffer.remaining()];
      buffer.get(bytes);
    } else {
      bytes = (byte[]) value;
    }
    return bytes;
  }

}
+21 −0
Original line number Diff line number Diff line
@@ -227,6 +227,27 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
    verifySearchResults(records, ignoreKey, ignoreSchema);
  }

  @Test
  public void testBytes() throws Exception {
    final boolean ignoreKey = false;
    final boolean ignoreSchema = false;

    Schema structSchema = SchemaBuilder.struct().name("struct")
        .field("bytes", SchemaBuilder.BYTES_SCHEMA)
        .build();

    Struct struct = new Struct(structSchema);
    struct.put("bytes", new byte[]{42});

    Collection<SinkRecord> records = new ArrayList<>();
    SinkRecord sinkRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, structSchema, struct, 0);
    records.add(sinkRecord);

    ElasticsearchWriter writer = initWriter(client, ignoreKey, ignoreSchema);
    writeDataAndRefresh(writer, records);
    verifySearchResults(records, ignoreKey, ignoreSchema);
  }

  private Collection<SinkRecord> prepareData(int numRecords) {
    Collection<SinkRecord> records = new ArrayList<>();
    for (int i = 0; i < numRecords; ++i) {