Commit e7874a74 authored by Shikhar Bhushan's avatar Shikhar Bhushan
Browse files

IndexingRequest -> IndexableRecord

parent d6c7baf6
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -31,7 +31,7 @@ import io.searchbox.core.Bulk;
import io.searchbox.core.BulkResult;
import io.searchbox.core.Index;

public class BulkIndexingClient implements BulkClient<IndexingRequest, Bulk> {
public class BulkIndexingClient implements BulkClient<IndexableRecord, Bulk> {

  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
  private static final Set<String> NON_RETRIABLE_ERROR_TYPES = Collections.singleton("mapper_parse_exception");
@@ -43,9 +43,9 @@ public class BulkIndexingClient implements BulkClient<IndexingRequest, Bulk> {
  }

  @Override
  public Bulk bulkRequest(List<IndexingRequest> batch) {
  public Bulk bulkRequest(List<IndexableRecord> batch) {
    final Bulk.Builder builder = new Bulk.Builder();
    for (IndexingRequest request : batch) {
    for (IndexableRecord request : batch) {
      builder.addAction(
          new Index.Builder(request.getPayload())
              .index(request.getIndex())
+2 −2
Original line number Diff line number Diff line
@@ -78,7 +78,7 @@ public class DataConverter {
    }
  }

  public static IndexingRequest convertRecord(
  public static IndexableRecord convertRecord(
      SinkRecord record,
      String type,
      JestClient client,
@@ -142,7 +142,7 @@ public class DataConverter {

    String payload = new String(JSON_CONVERTER.fromConnectData(topic, newSchema, newValue), StandardCharsets.UTF_8);

    return new IndexingRequest(index, type, id, payload);
    return new IndexableRecord(index, type, id, payload);
  }

  // We need to pre process the Kafka Connect schema before converting to JSON as Elasticsearch
+2 −2
Original line number Diff line number Diff line
@@ -49,7 +49,7 @@ public class ElasticsearchWriter {
  private final long flushTimeoutMs;

  private final Set<String> mappings;
  private final BulkProcessor<IndexingRequest, ?> bulkProcessor;
  private final BulkProcessor<IndexableRecord, ?> bulkProcessor;

  ElasticsearchWriter(
      JestClient client,
@@ -168,7 +168,7 @@ public class ElasticsearchWriter {

  public void write(Collection<SinkRecord> records) {
    for (SinkRecord record : records) {
      IndexingRequest request = DataConverter.convertRecord(record, type, client, ignoreKey, ignoreSchema, topicConfigs, mappings);
      IndexableRecord request = DataConverter.convertRecord(record, type, client, ignoreKey, ignoreSchema, topicConfigs, mappings);
      bulkProcessor.add(request);
    }
  }
+2 −2
Original line number Diff line number Diff line
@@ -16,14 +16,14 @@

package io.confluent.connect.elasticsearch;

public class IndexingRequest {
public class IndexableRecord {

  private final String index;
  private final String type;
  private final String id;
  private final String payload;

  public IndexingRequest(String index, String type, String id, String payload) {
  public IndexableRecord(String index, String type, String id, String payload) {
    this.index = index;
    this.type = type;
    this.id = id;