Commit 09f23106 authored by Conor Mongey's avatar Conor Mongey
Browse files

Create indices before writing records

Currently, indices are only created when the connector is opened. If the
destination index is changed by a Single Message Transform, the index
may not exist when the record is being written, and as such will fail
to create a mapping, as the index does not exist.

This patch creates the index, after the message has been transformed.
parent deacab93
Loading
Loading
Loading
Loading
+9 −0
Original line number Diff line number Diff line
@@ -45,6 +45,15 @@ public interface ElasticsearchClient extends AutoCloseable {
   */
  void createIndices(Set<String> indices);

  /**
   * Gets the Elasticsearch version.
   *
   * @param index the index check exists
   * @return whether the index exists
   * @throws IOException if the client cannot execute the request
   */
  boolean indexExists(String index);

  /**
   * Creates an explicit mapping.
   *
+5 −0
Original line number Diff line number Diff line
@@ -255,6 +255,11 @@ public class ElasticsearchWriter {

      if (!ignoreSchema && !existingMappings.contains(index)) {
        try {
          if (client.indexExists(index) == false) {
            Set<String> indicies = new HashSet<String>();
            indicies.add(index);
            client.createIndices(indicies);
          }
          if (Mapping.getMapping(client, index, type) == null) {
            Mapping.createMapping(client, index, type, sinkRecord.valueSchema());
          }
+1 −1
Original line number Diff line number Diff line
@@ -215,7 +215,7 @@ public class JestElasticsearchClient implements ElasticsearchClient {
    return version;
  }

  private boolean indexExists(String index) {
  public boolean indexExists(String index) {
    Action<JestResult> action = new IndicesExists.Builder(index).build();
    try {
      JestResult result = client.execute(action);
+30 −0
Original line number Diff line number Diff line
@@ -111,4 +111,34 @@ public class ElasticsearchSinkTaskTest extends ElasticsearchSinkTestBase {
      task.stop();
    }
  }
  @Test
  public void testCreateAndWriteToIndexCreatedByAnSMT() {
    // If the index mapping is changed by an SMT, the index wouldn't have been created on open()
    InternalTestCluster cluster = ESIntegTestCase.internalCluster();
    cluster.ensureAtLeastNumDataNodes(3);
    Map<String, String> props = createProps();
    ElasticsearchSinkTask task = new ElasticsearchSinkTask();

    String key = "key";
    Schema schema = createSchema();
    Struct record = createRecord(schema);

    SinkRecord sinkRecord = new SinkRecord("IndexThatDoesntExist",
            PARTITION_113,
            Schema.STRING_SCHEMA,
            key,
            schema,
            record,
            0 );

    try {
      task.start(props, client);
      task.open(new HashSet<>());
      task.put(Collections.singleton(sinkRecord));
    } catch (Exception ex) {
      fail(ex.getMessage());
    } finally {
      task.stop();
    }
  }
}