Commit cfa5b07a authored by Shikhar Bhushan's avatar Shikhar Bhushan
Browse files

timeout for add() as well to prevent indefinite blocking

parent e7874a74
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -169,7 +169,7 @@ public class ElasticsearchWriter {
  public void write(Collection<SinkRecord> records) {
    for (SinkRecord record : records) {
      IndexableRecord request = DataConverter.convertRecord(record, type, client, ignoreKey, ignoreSchema, topicConfigs, mappings);
      bulkProcessor.add(request);
      bulkProcessor.add(request, flushTimeoutMs);
    }
  }

+14 −4
Original line number Diff line number Diff line
@@ -254,20 +254,30 @@ public class BulkProcessor<R, B> {
    throwIfStopping();
  }

  public synchronized void add(R record) {
    add(record, Long.MAX_VALUE);
  }

  /**
   * Add a record, may block if at capacity with respect to {@code maxBufferedRecords}.
   * Add a record, may block upto {@code timeoutMs} if at capacity with respect to {@code maxBufferedRecords}.
   *
   * If any task has failed prior to or while blocked in the add, {@link ConnectException} will be thrown with that error.
   */
  public synchronized void add(R record) {
    while (!isTerminal() && bufferedRecords() >= maxBufferedRecords) {
  public synchronized void add(R record, long timeoutMs) {
    final long addStartTimeMs = time.milliseconds();
    for (long elapsedMs = time.milliseconds() - addStartTimeMs;
         !isTerminal() && elapsedMs < timeoutMs && bufferedRecords() >= maxBufferedRecords;
         elapsedMs = time.milliseconds() - addStartTimeMs) {
      try {
        wait();
        wait(timeoutMs - elapsedMs);
      } catch (InterruptedException e) {
        throw new ConnectException(e);
      }
    }
    throwIfTerminal();
    if (bufferedRecords() >= maxBufferedRecords) {
      throw new ConnectException("Add timeout expired before buffer availability");
    }
    unsentRecords.addLast(record);
    notifyAll();
  }