Loading src/main/java/io/confluent/connect/elasticsearch/bulk/BulkProcessor.java +12 −5 Original line number Diff line number Diff line Loading @@ -376,12 +376,14 @@ public class BulkProcessor<R, B> { } return bulkRsp; } if (ignoreMappingErrors && bulkRsp.getErrorInfo().contains("mapper_parsing_exception")) { log.info("Encountered mapper_parsing_exception when execute batch {} of {} records." + " Ignoring. {}", if (ignoreMappingErrors) { if (responseContainsMappingError(bulkRsp)) { log.info("Encountered mapper_parsing_exception when executing batch {} of {} records." + " Ignoring. Error was {}", batchId, batch.size(), bulkRsp.getErrorInfo()); return bulkRsp; } } retriable = bulkRsp.isRetriable(); throw new ConnectException("Bulk request failed: " + bulkRsp.getErrorInfo()); } catch (Exception e) { Loading @@ -402,6 +404,11 @@ public class BulkProcessor<R, B> { } } private boolean responseContainsMappingError(BulkResponse bulkRsp) { return bulkRsp.getErrorInfo().contains("mapper_parsing_exception") || bulkRsp.getErrorInfo().contains("illegal_argument_exception"); } private synchronized void onBatchCompletion(int batchSize) { inFlightRecords -= batchSize; assert inFlightRecords >= 0; Loading Loading
src/main/java/io/confluent/connect/elasticsearch/bulk/BulkProcessor.java +12 −5 Original line number Diff line number Diff line Loading @@ -376,12 +376,14 @@ public class BulkProcessor<R, B> { } return bulkRsp; } if (ignoreMappingErrors && bulkRsp.getErrorInfo().contains("mapper_parsing_exception")) { log.info("Encountered mapper_parsing_exception when execute batch {} of {} records." + " Ignoring. {}", if (ignoreMappingErrors) { if (responseContainsMappingError(bulkRsp)) { log.info("Encountered mapper_parsing_exception when executing batch {} of {} records." + " Ignoring. Error was {}", batchId, batch.size(), bulkRsp.getErrorInfo()); return bulkRsp; } } retriable = bulkRsp.isRetriable(); throw new ConnectException("Bulk request failed: " + bulkRsp.getErrorInfo()); } catch (Exception e) { Loading @@ -402,6 +404,11 @@ public class BulkProcessor<R, B> { } } private boolean responseContainsMappingError(BulkResponse bulkRsp) { return bulkRsp.getErrorInfo().contains("mapper_parsing_exception") || bulkRsp.getErrorInfo().contains("illegal_argument_exception"); } private synchronized void onBatchCompletion(int batchSize) { inFlightRecords -= batchSize; assert inFlightRecords >= 0; Loading