Commit 3691c0fa authored by Shikhar Bhushan's avatar Shikhar Bhushan
Browse files

Create and start ElasticsearchWriter from connector.start(), create indices...

Create and start ElasticsearchWriter from connector.start(), create indices with assigned topics in open()

This also allows removing the dependency on SinkTaskContext from ElasticsearchWriter
parent ab5f0159
Loading
Loading
Loading
Loading
+14 −15
Original line number Diff line number Diff line
@@ -41,7 +41,6 @@ public class ElasticsearchSinkTask extends SinkTask {
  private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class);
  private ElasticsearchWriter writer;
  private JestClient client;
  private ElasticsearchWriter.Builder builder;

  @Override
  public String version() {
@@ -85,7 +84,7 @@ public class ElasticsearchSinkTask extends SinkTask {
        this.client = factory.getObject();
      }

      builder = new ElasticsearchWriter.Builder(this.client)
      ElasticsearchWriter.Builder builder = new ElasticsearchWriter.Builder(this.client)
          .setType(type)
          .setIgnoreKey(ignoreKey)
          .setIgnoreSchema(ignoreSchema)
@@ -96,9 +95,10 @@ public class ElasticsearchSinkTask extends SinkTask {
          .setBatchSize(batchSize)
          .setLingerMs(lingerMs)
          .setRetryBackoffMs(retryBackoffMs)
          .setMaxRetry(maxRetry)
          .setContext(context);
          .setMaxRetry(maxRetry);

      writer = builder.build();
      writer.start();
    } catch (ConfigException e) {
      throw new ConnectException("Couldn't start ElasticsearchSinkTask due to configuration error:", e);
    }
@@ -107,37 +107,36 @@ public class ElasticsearchSinkTask extends SinkTask {
  @Override
  public void open(Collection<TopicPartition> partitions) {
    log.debug("Opening the task for topic partitions: {}", partitions);
    writer = builder.build();
    Set<String> topics = new HashSet<>();
    for (TopicPartition tp: partitions) {
      topics.add(tp.topic());
    }
    writer.createIndices(topics);
  }

  @Override
  public void put(Collection<SinkRecord> records) throws ConnectException {
    log.trace("Putting {} to Elasticsearch.", records);
    if (writer != null) {
    writer.write(records);
  }
  }

  @Override
  public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
    log.trace("Flushing data to Elasticsearch with the following offsets: {}", offsets);
    if (writer != null) {
    writer.flush();
  }
  }

  @Override
  public void close(Collection<TopicPartition> partitions) {
    log.debug("Closing the task for topic partitions: {}", partitions);
    if (writer != null) {
      writer.close();
      writer = null;
    }
  }

  @Override
  public void stop() throws ConnectException {
    log.info("Stopping ElasticsearchSinkTask.");
    if (writer != null) {
      writer.stop();
    }
    if (client != null) {
      client.shutdownClient();
    }
+17 −42
Original line number Diff line number Diff line
@@ -16,16 +16,15 @@

package io.confluent.connect.elasticsearch;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -65,7 +64,6 @@ public class ElasticsearchWriter {
  private final Map<String, TopicConfig> topicConfigs;
  private final long flushTimeoutMs;
  private final long maxBufferedRecords;
  private final SinkTaskContext context;
  private final Set<String> mappings;

  /**
@@ -80,7 +78,6 @@ public class ElasticsearchWriter {
   * @param maxInFlightRequests The max number of inflight requests allowed.
   * @param batchSize Approximately the max number of records each writer will buffer.
   * @param lingerMs The time to wait before sending a batch.
   * @param context The SinkTaskContext.
   */
  ElasticsearchWriter(
      JestClient client,
@@ -94,31 +91,20 @@ public class ElasticsearchWriter {
      int batchSize,
      long lingerMs,
      int maxRetry,
      long retryBackoffMs,
      SinkTaskContext context) {
      long retryBackoffMs) {

    this.client = client;
    this.type = type;
    this.ignoreKey = ignoreKey;
    this.ignoreSchema = ignoreSchema;

    if (topicConfigs == null) {
      this.topicConfigs = new HashMap<>();
    } else {
      this.topicConfigs = topicConfigs;
    }
    this.topicConfigs = topicConfigs == null ? Collections.<String, TopicConfig>emptyMap() : topicConfigs;

    this.flushTimeoutMs = flushTimeoutMs;
    this.maxBufferedRecords  = maxBufferedRecords;

    this.context = context;

    // create index if needed.
    createIndices(topicConfigs);

    // Start the BulkProcessor
    bulkProcessor = new BulkProcessor(new HttpClient(client), maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs, createDefaultListener());
    bulkProcessor.start();

    //Create mapping cache
    mappings = new HashSet<>();
@@ -137,7 +123,6 @@ public class ElasticsearchWriter {
    private long lingerMs;
    private int maxRetry;
    private long retryBackoffMs;
    private SinkTaskContext context;

    /**
     * Constructor of ElasticsearchWriter Builder.
@@ -259,23 +244,13 @@ public class ElasticsearchWriter {
      return this;
    }

    /**
     * Set the SinkTaskContext
     * @param context The SinkTaskContext.
     * @return an instance of ElasticsearchWriter Builder.
     */
    public Builder setContext(SinkTaskContext context) {
      this.context = context;
      return this;
    }

    /**
     * Build the ElasticsearchWriter.
     * @return an instance of ElasticsearchWriter.
     */
    public ElasticsearchWriter build() {
      return new ElasticsearchWriter(
          client, type, ignoreKey, ignoreSchema, topicConfigs, flushTimeoutMs, maxBufferedRecords, maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs, context);
          client, type, ignoreKey, ignoreSchema, topicConfigs, flushTimeoutMs, maxBufferedRecords, maxInFlightRequests, batchSize, lingerMs, maxRetry, retryBackoffMs);
    }
  }

@@ -304,7 +279,11 @@ public class ElasticsearchWriter {
    }
  }

  public void close() {
  public void start() {
    bulkProcessor.start();
  }

  public void stop() {
    bulkProcessor.stop();
    try {
      bulkProcessor.awaitStop(flushTimeoutMs);
@@ -325,19 +304,15 @@ public class ElasticsearchWriter {
    }
  }

  private void createIndices(Map<String, TopicConfig> topicConfigs) {
    Set<TopicPartition> assignment = context.assignment();
    Set<String> topics = new HashSet<>();
    for (TopicPartition tp: assignment) {
      String topic = tp.topic();
      if (!topicConfigs.containsKey(topic)) {
        topics.add(topic);
      }
  public void createIndices(Set<String> assignedTopics) {
    Set<String> indices = new HashSet<>();
    for (String topic: assignedTopics) {
      final TopicConfig topicConfig = topicConfigs.get(topic);
      if (topicConfig != null) {
        indices.add(topicConfig.getIndex());
      } else {
        indices.add(topic);
      }

    Set<String> indices = new HashSet<>(topics);
    for (String topic: topicConfigs.keySet()) {
      indices.add(topicConfigs.get(topic).getIndex());
    }
    for (String index: indices) {
      if (!indexExists(index)) {
+3 −2
Original line number Diff line number Diff line
@@ -26,8 +26,10 @@ import org.elasticsearch.test.InternalTestCluster;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
@@ -48,9 +50,8 @@ public class ElasticsearchSinkTaskTest extends ElasticsearchSinkTestBase {
    Map<String, String> props = createProps();

    ElasticsearchSinkTask task = new ElasticsearchSinkTask();
    task.initialize(context);
    task.start(props, client);
    task.open(assignment);
    task.open(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)));

    String key = "key";
    Schema schema = createSchema();
+1 −80
Original line number Diff line number Diff line
@@ -49,8 +49,6 @@ import io.searchbox.core.SearchResult;

public class ElasticsearchSinkTestBase extends ESIntegTestCase {

  protected static Set<TopicPartition> assignment;

  protected static final String TYPE = "kafka-connect";
  protected static final long SLEEP_INTERVAL_MS = 2000;

@@ -61,29 +59,13 @@ public class ElasticsearchSinkTestBase extends ESIntegTestCase {
  protected static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
  protected static final TopicPartition TOPIC_PARTITION2 = new TopicPartition(TOPIC, PARTITION2);
  protected static final TopicPartition TOPIC_PARTITION3 = new TopicPartition(TOPIC, PARTITION3);
  protected static SinkTaskContext context;

  protected final JestClientFactory factory = new JestClientFactory();
  protected JestHttpClient client;

  @BeforeClass
  public static void createAssignment() {
    assignment = new HashSet<>();
    assignment.add(TOPIC_PARTITION);
    assignment.add(TOPIC_PARTITION2);
    assignment.add(TOPIC_PARTITION3);
    context = new MockSinkTaskContext();
  }

  @AfterClass
  public static void clearAssignment() {
    assignment.clear();
    context = null;
  }

  @Before
  public void setUp() throws Exception {
    super.setUp();
    final JestClientFactory factory = new JestClientFactory();
    factory.setHttpClientConfig(
        new HttpClientConfig
            .Builder("http://localhost:" + getPort())
@@ -172,65 +154,4 @@ public class ElasticsearchSinkTestBase extends ESIntegTestCase {
        .build();
  }

  protected static class MockSinkTaskContext implements SinkTaskContext {

    private Map<TopicPartition, Long> offsets;
    private long timeoutMs;

    public MockSinkTaskContext() {
      this.offsets = new HashMap<>();
      this.timeoutMs = -1L;
    }

    @Override
    public void offset(Map<TopicPartition, Long> offsets) {
      this.offsets.putAll(offsets);
    }

    @Override
    public void offset(TopicPartition tp, long offset) {
      offsets.put(tp, offset);
    }

    /**
     * Get offsets that the SinkTask has submitted to be reset. Used by the Copycat framework.
     * @return the map of offsets
     */
    public Map<TopicPartition, Long> offsets() {
      return offsets;
    }

    @Override
    public void timeout(long timeoutMs) {
      this.timeoutMs = timeoutMs;
    }

    /**
     * Get the timeout in milliseconds set by SinkTasks. Used by the Copycat framework.
     * @return the backoff timeout in milliseconds.
     */
    public long timeout() {
      return timeoutMs;
    }

    /**
     * Get the timeout in milliseconds set by SinkTasks. Used by the Copycat framework.
     * @return the backoff timeout in milliseconds.
     */

    @Override
    public Set<TopicPartition> assignment() {
      return assignment;
    }

    @Override
    public void pause(TopicPartition... partitions) {
      return;
    }

    @Override
    public void resume(TopicPartition... partitions) {
      return;
    }
  }
}
+14 −12
Original line number Diff line number Diff line
@@ -49,7 +49,7 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
  @Test
  public void testWriter() throws Exception {
    Collection<SinkRecord> records = prepareData(2);
    ElasticsearchWriter writer = createWriter(client, false, false, Collections.<String, TopicConfig>emptyMap());
    ElasticsearchWriter writer = initWriter(client, false, false, Collections.<String, TopicConfig>emptyMap());
    writeDataAndWait(writer, records, SLEEP_INTERVAL_MS);

    Collection<SinkRecord> expected = Collections.singletonList(new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, record, 1));
@@ -59,7 +59,7 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
  @Test
  public void testWriterIgnoreKey() throws Exception {
    Collection<SinkRecord> records = prepareData(2);
    ElasticsearchWriter writer = createWriter(client, true, false, Collections.<String, TopicConfig>emptyMap());
    ElasticsearchWriter writer = initWriter(client, true, false, Collections.<String, TopicConfig>emptyMap());
    writeDataAndWait(writer, records, SLEEP_INTERVAL_MS);
    verifySearch(records, search(client), true);
  }
@@ -67,7 +67,7 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
  @Test
  public void testWriterIgnoreSchema() throws Exception {
    Collection<SinkRecord> records = prepareData(2);
    ElasticsearchWriter writer = createWriter(client, true, true, Collections.<String, TopicConfig>emptyMap());
    ElasticsearchWriter writer = initWriter(client, true, true, Collections.<String, TopicConfig>emptyMap());

    writeDataAndWait(writer, records, SLEEP_INTERVAL_MS);
    verifySearch(records, search(client), true);
@@ -81,7 +81,7 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
    Map<String, TopicConfig> topicConfigs = new HashMap<>();
    topicConfigs.put(TOPIC, topicConfig);

    ElasticsearchWriter writer = createWriter(client, false, true, topicConfigs);
    ElasticsearchWriter writer = initWriter(client, false, true, topicConfigs);
    writeDataAndWait(writer, records, SLEEP_INTERVAL_MS);
    verifySearch(records, search(client), true);
  }
@@ -92,7 +92,7 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
    SinkRecord sinkRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, otherSchema, otherRecord, 0);
    records.add(sinkRecord);

    ElasticsearchWriter writer = createWriter(client, true, false, Collections.<String, TopicConfig>emptyMap());
    ElasticsearchWriter writer = initWriter(client, true, false, Collections.<String, TopicConfig>emptyMap());

    writer.write(records);
    Thread.sleep(5000);
@@ -121,7 +121,7 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
    records.add(sinkRecord);
    expected.add(sinkRecord);

    ElasticsearchWriter writer = createWriter(client, true, false, Collections.<String, TopicConfig>emptyMap());
    ElasticsearchWriter writer = initWriter(client, true, false, Collections.<String, TopicConfig>emptyMap());

    writer.write(records);
    records.clear();
@@ -155,7 +155,7 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
    SinkRecord sinkRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, structSchema, struct, 0);
    records.add(sinkRecord);

    ElasticsearchWriter writer = createWriter(client, false, false, Collections.<String, TopicConfig>emptyMap());
    ElasticsearchWriter writer = initWriter(client, false, false, Collections.<String, TopicConfig>emptyMap());

    writeDataAndWait(writer, records, SLEEP_INTERVAL_MS);

@@ -179,7 +179,7 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
    SinkRecord sinkRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, structSchema, struct, 0);
    records.add(sinkRecord);

    ElasticsearchWriter writer = createWriter(client, false, false, Collections.<String, TopicConfig>emptyMap());
    ElasticsearchWriter writer = initWriter(client, false, false, Collections.<String, TopicConfig>emptyMap());

    writeDataAndWait(writer, records, SLEEP_INTERVAL_MS);

@@ -195,8 +195,8 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
    return records;
  }

  private ElasticsearchWriter createWriter(JestClient client, boolean ignoreKey, boolean ignoreSchema, Map<String, TopicConfig> topicConfigs) {
    return new ElasticsearchWriter.Builder(client)
  private ElasticsearchWriter initWriter(JestClient client, boolean ignoreKey, boolean ignoreSchema, Map<String, TopicConfig> topicConfigs) {
    ElasticsearchWriter writer = new ElasticsearchWriter.Builder(client)
        .setType(TYPE)
        .setIgnoreKey(ignoreKey)
        .setIgnoreSchema(ignoreSchema)
@@ -208,14 +208,16 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
        .setLingerMs(1000)
        .setRetryBackoffMs(1000)
        .setMaxRetry(3)
        .setContext(context)
        .build();
    writer.start();
    writer.createIndices(Collections.singleton(TOPIC));
    return writer;
  }

  private void writeDataAndWait(ElasticsearchWriter writer, Collection<SinkRecord> records, long waitInterval) throws InterruptedException {
    writer.write(records);
    writer.flush();
    writer.close();
    writer.stop();
    Thread.sleep(waitInterval);
  }
}