Loading src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +8 −7 Original line number Diff line number Diff line Loading @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY; Loading Loading @@ -196,15 +197,15 @@ public class DataConverter { switch (schemaType) { case ARRAY: Collection collection = (Collection) value; ArrayList<Object> result = new ArrayList<>(); List<Object> result = new ArrayList<>(); for (Object element: collection) { result.add(preProcessValue(element, schema.valueSchema(), newSchema.valueSchema())); } return result; case MAP: Schema keySchema = schema.keySchema(); Schema newValueSchema = newSchema.valueSchema(); Schema valueSchema = schema.valueSchema(); Schema newValueSchema = newSchema.valueSchema(); Map<?, ?> map = (Map<?, ?>) value; if (keySchema.type() == Schema.Type.STRING) { Map<Object, Object> processedMap = new HashMap<>(); Loading @@ -214,7 +215,7 @@ public class DataConverter { } return processedMap; } ArrayList<Struct> mapStructs = new ArrayList<>(); List<Struct> mapStructs = new ArrayList<>(); for (Map.Entry<?, ?> entry: map.entrySet()) { Struct mapStruct = new Struct(newValueSchema); mapStruct.put(MAP_KEY, preProcessValue(entry.getKey(), keySchema, newValueSchema.field(MAP_KEY).schema())); Loading Loading
src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +8 −7 Original line number Diff line number Diff line Loading @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY; Loading Loading @@ -196,15 +197,15 @@ public class DataConverter { switch (schemaType) { case ARRAY: Collection collection = (Collection) value; ArrayList<Object> result = new ArrayList<>(); List<Object> result = new ArrayList<>(); for (Object element: collection) { result.add(preProcessValue(element, schema.valueSchema(), newSchema.valueSchema())); } return result; case MAP: Schema keySchema = schema.keySchema(); Schema newValueSchema = newSchema.valueSchema(); Schema valueSchema = schema.valueSchema(); Schema newValueSchema = newSchema.valueSchema(); Map<?, ?> map = (Map<?, ?>) value; if (keySchema.type() == Schema.Type.STRING) { Map<Object, Object> processedMap = new HashMap<>(); Loading @@ -214,7 +215,7 @@ public class DataConverter { } return processedMap; } ArrayList<Struct> mapStructs = new ArrayList<>(); List<Struct> mapStructs = new ArrayList<>(); for (Map.Entry<?, ?> entry: map.entrySet()) { Struct mapStruct = new Struct(newValueSchema); mapStruct.put(MAP_KEY, preProcessValue(entry.getKey(), keySchema, newValueSchema.field(MAP_KEY).schema())); Loading