Commit cbb6e4c6 authored by Liquan Pei's avatar Liquan Pei
Browse files

Make callback handler thread safe

parent a9b4849d
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -19,5 +19,5 @@ connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=test-elasticsearch-sink
key.ignore=true
transport.addresses=localhost:9300
transport.addresses=localhost:9200
type.name=kafka-connect
 No newline at end of file
+2 −2
Original line number Diff line number Diff line
@@ -12,8 +12,8 @@
        <confluent.version>3.0.0-SNAPSHOT</confluent.version>
        <kafka.version>0.10.1.0-SNAPSHOT</kafka.version>
        <junit.version>4.12</junit.version>
        <es.version>2.2.1</es.version>
        <lucene.version>5.4.1</lucene.version>
        <es.version>2.3.3</es.version>
        <lucene.version>5.5.0</lucene.version>
        <slf4j.version>1.7.5</slf4j.version>
        <jna.version>4.2.1</jna.version>
        <hamcrest.version>2.0.0.0</hamcrest.version>
+1 −1
Original line number Diff line number Diff line
@@ -119,7 +119,7 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {

  public static ConfigDef baseConfigDef() {
    return new ConfigDef()
        .define(HTTP_ADDRESSES_CONFIG, Type.LIST, Importance.HIGH, HTTP_ADDRESSES_DOC, ELASTICSEARCH_GROUP, 1, Width.LONG, HTTP_ADDRESSES_DISPLAY)
        .define(HTTP_ADDRESSES_CONFIG, Type.STRING, Importance.HIGH, HTTP_ADDRESSES_DOC, ELASTICSEARCH_GROUP, 1, Width.LONG, HTTP_ADDRESSES_DISPLAY)
        .define(TYPE_NAME_CONFIG, Type.STRING, Importance.HIGH, TYPE_NAME_DOC, ELASTICSEARCH_GROUP, 2, Width.SHORT, TYPE_NAME_DISPLAY)
        .define(KEY_IGNORE_CONFIG, Type.BOOLEAN, KEY_IGNORE_DEFAULT, Importance.HIGH, KEY_IGNORE_DOC, CONNECTOR_GROUP, 3, Width.SHORT, KEY_IGNORE_DISPLAY)
        .define(BATCH_SIZE_CONFIG, Type.INT, BATCH_SIZE_DEFAULT, Importance.MEDIUM, BATCH_SIZE_DOC, CONNECTOR_GROUP, 4, Width.SHORT, BATCH_SIZE_DISPLAY)
+2 −2
Original line number Diff line number Diff line
@@ -90,9 +90,9 @@ public class ElasticsearchSinkTask extends SinkTask {
      if (client != null) {
        this.client = client;
      } else {
        List<String> addresses = config.getList(ElasticsearchSinkConnectorConfig.HTTP_ADDRESSES_CONFIG);
        String address = config.getString(ElasticsearchSinkConnectorConfig.HTTP_ADDRESSES_CONFIG);
        JestClientFactory factory = new JestClientFactory();
        factory.setHttpClientConfig(new HttpClientConfig.Builder(addresses.get(0)).multiThreaded(true).build());
        factory.setHttpClientConfig(new HttpClientConfig.Builder(address).multiThreaded(true).build());
        this.client = factory.getObject();
      }

+8 −8
Original line number Diff line number Diff line
@@ -79,16 +79,16 @@ public class Mapping {
    }
    GetMapping getMapping = new GetMapping.Builder().addIndex(index).addType(type).build();
    JestResult result = client.execute(getMapping);
    JsonObject resultJson = result.getJsonObject();
    if (resultJson.getAsJsonObject(index) == null || resultJson.getAsJsonObject(index).getAsJsonObject(type) == null) {
    JsonObject resultJson = result.getJsonObject().getAsJsonObject(index);
    if (resultJson == null) {
      return false;
    } else {
      boolean exist = resultJson.getAsJsonObject(index).getAsJsonObject(type) != null;
      if (exist) {
        mappings.add(index);
    }
      return exist;
    JsonObject typeJson = resultJson.getAsJsonObject(type);
    if (typeJson == null) {
      return false;
    }
    mappings.add(index);
    return true;
  }

  /**
Loading