Commit dae4acbd authored by Shikhar Bhushan's avatar Shikhar Bhushan
Browse files

Merge branch '3.1.x'

parents cfeeb410 d517c5f9
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -197,7 +197,7 @@ public class ElasticsearchWriter {

      if (!ignoreSchema && !existingMappings.contains(index)) {
        try {
          if (!Mapping.doesMappingExist(client, index, type)) {
          if (Mapping.getMapping(client, index, type) == null) {
            Mapping.createMapping(client, index, type, sinkRecord.valueSchema());
          }
        } catch (IOException e) {
+11 −13
Original line number Diff line number Diff line
@@ -66,21 +66,19 @@ public class Mapping {
  }

  /**
   * Check the whether a mapping exists or not for a type.
   * @param client The client to connect to Elasticsearch.
   * @param index The index to write to Elasticsearch.
   * @param type The type to check.
   * @return Whether the type exists or not.
   * Get the JSON mapping for given index and type. Returns {@code null} if it does not exist.
   */
  public static boolean doesMappingExist(JestClient client, String index, String type) throws IOException {
    GetMapping getMapping = new GetMapping.Builder().addIndex(index).addType(type).build();
    JestResult result = client.execute(getMapping);
    JsonObject resultJson = result.getJsonObject().getAsJsonObject(index);
    if (resultJson == null) {
      return false;
    }
    JsonObject typeJson = resultJson.getAsJsonObject(type);
    return typeJson != null;
  public static JsonObject getMapping(JestClient client, String index, String type) throws IOException {
    final JestResult result = client.execute(new GetMapping.Builder().addIndex(index).addType(type).build());
    final JsonObject indexRoot = result.getJsonObject().getAsJsonObject(index);
    if (indexRoot == null) {
      return null;
    }
    final JsonObject mappingsJson = indexRoot.getAsJsonObject("mappings");
    if (mappingsJson == null) {
      return  null;
    }
    return mappingsJson.getAsJsonObject(type);
  }

  /**
+1 −4
Original line number Diff line number Diff line
@@ -47,10 +47,7 @@ public class MappingTest extends ElasticsearchSinkTestBase {
    Schema schema = createSchema();
    Mapping.createMapping(client, INDEX, TYPE, schema);

    GetMapping getMapping = new GetMapping.Builder().addIndex(INDEX).addType(TYPE).build();
    JestResult result = client.execute(getMapping);
    JsonObject resultJson = result.getJsonObject();
    JsonObject mapping = resultJson.get(INDEX).getAsJsonObject().get("mappings").getAsJsonObject().get(TYPE).getAsJsonObject();
    JsonObject mapping = Mapping.getMapping(client, INDEX, TYPE);
    assertNotNull(mapping);
    verifyMapping(schema, mapping);
  }