Commit 0696298f authored by Pere Urbon's avatar Pere Urbon
Browse files

fix bug when topic name is in Caps while the index name in elasticsearch...

fix bug when topic name is in Caps while the index name in elasticsearch should be in all lowercase letters, added as well a helper method to simplify
the necessary conversion. Added test for this case.
parent 26984c3c
Loading
Loading
Loading
Loading
+19 −8
Original line number Diff line number Diff line
@@ -189,9 +189,9 @@ public class ElasticsearchWriter {
  }

  public void write(Collection<SinkRecord> records) {

    for (SinkRecord sinkRecord : records) {
      final String indexOverride = topicToIndexMap.get(sinkRecord.topic());
      final String index = indexOverride != null ? indexOverride : sinkRecord.topic();
      final String index = convertTopicToIndexName(sinkRecord.topic());
      final boolean ignoreKey = ignoreKeyTopics.contains(sinkRecord.topic()) || this.ignoreKey;
      final boolean ignoreSchema = ignoreSchemaTopics.contains(sinkRecord.topic()) || this.ignoreSchema;

@@ -211,6 +211,22 @@ public class ElasticsearchWriter {

      bulkProcessor.add(indexableRecord, flushTimeoutMs);
    }

  }

  /**
   * Return the expected index name for a given topic. It makes sure to lowercase the topic name
   * as elasticsearch require all index names to be lowercase.
   * See https://github.com/confluentinc/kafka-connect-elasticsearch/issues/246 for connector details.
   * See https://github.com/elastic/elasticsearch/issues/29420 for elasticsearch details.
   * @param topic The topic name being proceed.
   * @return String A valid elasticsearch index name
   */
  private String convertTopicToIndexName(String topic) {
    final String indexOverride = topicToIndexMap.get(topic);
    String index = indexOverride != null ? indexOverride : topic.toLowerCase();
    log.debug("Topic " + topic + " was translated as index name " + index);
    return index;
  }

  public void flush() {
@@ -261,12 +277,7 @@ public class ElasticsearchWriter {
  private Set<String> indicesForTopics(Set<String> assignedTopics) {
    final Set<String> indices = new HashSet<>();
    for (String topic : assignedTopics) {
      final String index = topicToIndexMap.get(topic);
      if (index != null) {
        indices.add(index);
      } else {
        indices.add(topic);
      }
      indices.add(convertTopicToIndexName(topic));
    }
    return indices;
  }
+44 −2
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@ package io.confluent.connect.elasticsearch;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -25,16 +26,22 @@ import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Collection;
import java.util.ArrayList;
import java.util.Collections;


@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
public class ElasticsearchSinkTaskTest extends ElasticsearchSinkTestBase {

  private static final String TOPIC_IN_CAPS = "AnotherTopicInCaps";
  private static final int PARTITION_113 = 113;
  private static final TopicPartition TOPIC_IN_CAPS_PARTITION = new TopicPartition(TOPIC_IN_CAPS, PARTITION_113);

  private Map<String, String> createProps() {
    Map<String, String> props = new HashMap<>();
    props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, TYPE);
@@ -72,4 +79,39 @@ public class ElasticsearchSinkTaskTest extends ElasticsearchSinkTestBase {
    verifySearchResults(records, true, false);
  }

  @Test
  public void testPutWithTopicInCaps() {
    // We should as well test that writing a record with a previously un seen record will create
    // an index following the required elasticsearch requirements of lowercasing.
    InternalTestCluster cluster = ESIntegTestCase.internalCluster();
    cluster.ensureAtLeastNumDataNodes(3);
    Map<String, String> props = createProps();

    ElasticsearchSinkTask task = new ElasticsearchSinkTask();

    String key = "key";
    Schema schema = createSchema();
    Struct record = createRecord(schema);

    SinkRecord sinkRecord = new SinkRecord(TOPIC_IN_CAPS,
            PARTITION_113,
            Schema.STRING_SCHEMA,
            key,
            schema,
            record,
            0 );

    try {
      task.start(props, client);
      task.open(new HashSet<>(Collections.singletonList(TOPIC_IN_CAPS_PARTITION)));
      task.put(Collections.singleton(sinkRecord));
      assertTrue("A topic name not in lowercase was created in Elasticsearch", true);
    } catch (Exception ex) {
      fail("A topic name not in lowercase can not be used as index name in Elasticsearch");
    } finally {
      task.stop();
    }

  }

}