Commit a42fd479 authored by Conor Mongey's avatar Conor Mongey
Browse files

Cache calls to create indices

So multiple calls with the same index does not make a network call
parent 09f23106
Loading
Loading
Loading
Loading
+0 −9
Original line number Diff line number Diff line
@@ -45,15 +45,6 @@ public interface ElasticsearchClient extends AutoCloseable {
   */
  void createIndices(Set<String> indices);

  /**
   * Gets the Elasticsearch version.
   *
   * @param index the index check exists
   * @return whether the index exists
   * @throws IOException if the client cannot execute the request
   */
  boolean indexExists(String index);

  /**
   * Creates an explicit mapping.
   *
+5 −8
Original line number Diff line number Diff line
@@ -24,12 +24,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.HashMap;
import java.util.Objects;
import java.util.Map;
import java.util.Set;

import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues;
@@ -255,11 +256,7 @@ public class ElasticsearchWriter {

      if (!ignoreSchema && !existingMappings.contains(index)) {
        try {
          if (client.indexExists(index) == false) {
            Set<String> indicies = new HashSet<String>();
            indicies.add(index);
            client.createIndices(indicies);
          }
          client.createIndices(new HashSet<>(Arrays.asList(index)));
          if (Mapping.getMapping(client, index, type) == null) {
            Mapping.createMapping(client, index, type, sinkRecord.valueSchema());
          }
+5 −2
Original line number Diff line number Diff line
@@ -54,6 +54,7 @@ import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -70,6 +71,7 @@ public class JestElasticsearchClient implements ElasticsearchClient {
  private static final Logger LOG = LoggerFactory.getLogger(JestElasticsearchClient.class);

  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
  private final HashSet<String> createdIndices = new HashSet<>();

  private final JestClient client;
  private final Version version;
@@ -215,7 +217,7 @@ public class JestElasticsearchClient implements ElasticsearchClient {
    return version;
  }

  public boolean indexExists(String index) {
  private boolean indexExists(String index) {
    Action<JestResult> action = new IndicesExists.Builder(index).build();
    try {
      JestResult result = client.execute(action);
@@ -227,7 +229,7 @@ public class JestElasticsearchClient implements ElasticsearchClient {

  public void createIndices(Set<String> indices) {
    for (String index : indices) {
      if (!indexExists(index)) {
      if (!this.createdIndices.contains(index) && !indexExists(index)) {
        CreateIndex createIndex = new CreateIndex.Builder(index).build();
        try {
          JestResult result = client.execute(createIndex);
@@ -241,6 +243,7 @@ public class JestElasticsearchClient implements ElasticsearchClient {
        } catch (IOException e) {
          throw new ConnectException(e);
        }
        this.createdIndices.add(index);
      }
    }
  }