Commit d544bc34 authored by Shikhar Bhushan's avatar Shikhar Bhushan
Browse files

Test improvements - no need to sleep when can refresh after writes

parent 7a024455
Loading
Loading
Loading
Loading
+4 −2
Original line number Diff line number Diff line
@@ -67,7 +67,9 @@ public class ElasticsearchSinkTaskTest extends ElasticsearchSinkTestBase {
    task.put(records);
    task.flush(null);

    Thread.sleep(SLEEP_INTERVAL_MS);
    verifySearch(records, search(client), true);
    refresh();

    verifySearchResults(records, true);
  }

}
+2 −10
Original line number Diff line number Diff line
@@ -23,24 +23,18 @@ import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.client.http.JestHttpClient;
@@ -115,11 +109,9 @@ public class ElasticsearchSinkTestBase extends ESIntegTestCase {
    return struct;
  }

  protected SearchResult search(JestClient client) throws IOException {
    return client.execute(new Search.Builder("").build());
  }
  protected void verifySearchResults(Collection<SinkRecord> records, boolean ignoreKey) throws IOException {
    SearchResult result = client.execute(new Search.Builder("").build());

  protected void verifySearch(Collection<SinkRecord> records, SearchResult result, boolean ignoreKey) {
    JsonArray hits = result.getJsonObject().getAsJsonObject("hits").getAsJsonArray("hits");
    assertEquals(records.size(), hits.size());
    Set<String> hitIds = new HashSet<>();
+16 −16
Original line number Diff line number Diff line
@@ -50,18 +50,18 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
  public void testWriter() throws Exception {
    Collection<SinkRecord> records = prepareData(2);
    ElasticsearchWriter writer = initWriter(client, false, false, Collections.<String, TopicConfig>emptyMap());
    writeDataAndWait(writer, records, SLEEP_INTERVAL_MS);
    writeDataAndRefresh(writer, records);

    Collection<SinkRecord> expected = Collections.singletonList(new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, record, 1));
    verifySearch(expected, search(client), false);
    verifySearchResults(expected, false);
  }

  @Test
  public void testWriterIgnoreKey() throws Exception {
    Collection<SinkRecord> records = prepareData(2);
    ElasticsearchWriter writer = initWriter(client, true, false, Collections.<String, TopicConfig>emptyMap());
    writeDataAndWait(writer, records, SLEEP_INTERVAL_MS);
    verifySearch(records, search(client), true);
    writeDataAndRefresh(writer, records);
    verifySearchResults(records, true);
  }

  @Test
@@ -69,8 +69,8 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
    Collection<SinkRecord> records = prepareData(2);
    ElasticsearchWriter writer = initWriter(client, true, true, Collections.<String, TopicConfig>emptyMap());

    writeDataAndWait(writer, records, SLEEP_INTERVAL_MS);
    verifySearch(records, search(client), true);
    writeDataAndRefresh(writer, records);
    verifySearchResults(records, true);
  }

  @Test
@@ -82,8 +82,8 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
    topicConfigs.put(TOPIC, topicConfig);

    ElasticsearchWriter writer = initWriter(client, false, true, topicConfigs);
    writeDataAndWait(writer, records, SLEEP_INTERVAL_MS);
    verifySearch(records, search(client), true);
    writeDataAndRefresh(writer, records);
    verifySearchResults(records, true);
  }

  @Test
@@ -134,8 +134,8 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
    records.add(sinkRecord);
    expected.add(sinkRecord);

    writeDataAndWait(writer, records, SLEEP_INTERVAL_MS);
    verifySearch(expected, search(client), true);
    writeDataAndRefresh(writer, records);
    verifySearchResults(expected, true);
  }

  @Test
@@ -157,9 +157,9 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {

    ElasticsearchWriter writer = initWriter(client, false, false, Collections.<String, TopicConfig>emptyMap());

    writeDataAndWait(writer, records, SLEEP_INTERVAL_MS);
    writeDataAndRefresh(writer, records);

    verifySearch(records, search(client), false);
    verifySearchResults(records, false);
  }

  @Test
@@ -181,9 +181,9 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {

    ElasticsearchWriter writer = initWriter(client, false, false, Collections.<String, TopicConfig>emptyMap());

    writeDataAndWait(writer, records, SLEEP_INTERVAL_MS);
    writeDataAndRefresh(writer, records);

    verifySearch(records, search(client), false);
    verifySearchResults(records, false);
  }

  private Collection<SinkRecord> prepareData(int numRecords) {
@@ -214,10 +214,10 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
    return writer;
  }

  private void writeDataAndWait(ElasticsearchWriter writer, Collection<SinkRecord> records, long waitInterval) throws Exception {
  private void writeDataAndRefresh(ElasticsearchWriter writer, Collection<SinkRecord> records) throws Exception {
    writer.write(records);
    writer.flush();
    writer.stop();
    Thread.sleep(waitInterval);
    refresh();
  }
}