Loading src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +42 −23 Original line number Diff line number Diff line Loading @@ -53,11 +53,13 @@ public class DataConverter { if (key == null) { throw new ConnectException("Key is used as document id and can not be null."); } Schema.Type schemaType; final Schema.Type schemaType; if (keySchema == null) { schemaType = ConnectSchema.schemaType(key.getClass()); if (schemaType == null) if (schemaType == null) { throw new DataException("Java class " + key.getClass() + " does not have corresponding schema type."); } } else { schemaType = keySchema.type(); } Loading Loading @@ -105,13 +107,11 @@ public class DataConverter { return null; } // Handle logical types SchemaBuilder builder; String schemaName = schema.name(); if (schemaName != null) { switch (schemaName) { case Decimal.LOGICAL_NAME: builder = SchemaBuilder.float64(); return builder.build(); return copySchemaBasics(schema, SchemaBuilder.float64()).build(); case Date.LOGICAL_NAME: case Time.LOGICAL_NAME: case Timestamp.LOGICAL_NAME: Loading @@ -120,39 +120,58 @@ public class DataConverter { } Schema.Type schemaType = schema.type(); Schema keySchema; Schema valueSchema; switch (schemaType) { case ARRAY: valueSchema = schema.valueSchema(); builder = SchemaBuilder.array(preProcessSchema(valueSchema)); return builder.build(); case MAP: keySchema = schema.keySchema(); valueSchema = schema.valueSchema(); case ARRAY: { return copySchemaBasics(schema, SchemaBuilder.array(preProcessSchema(schema.valueSchema()))).build(); } case MAP: { Schema keySchema = schema.keySchema(); Schema valueSchema = schema.valueSchema(); String keyName = keySchema.name() == null ? keySchema.type().name() : keySchema.name(); String valueName = valueSchema.name() == null ? valueSchema.type().name() : valueSchema.name(); builder = SchemaBuilder.array(SchemaBuilder.struct().name(keyName + "-" + valueName) Schema elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName) .field(MAP_KEY, preProcessSchema(keySchema)) .field(MAP_VALUE, preProcessSchema(valueSchema)) .build()); return builder.build(); case STRUCT: builder = SchemaBuilder.struct().name(schema.name()); .build(); return copySchemaBasics(schema, SchemaBuilder.array(elementSchema)).build(); } case STRUCT: { SchemaBuilder structBuilder = copySchemaBasics(schema, SchemaBuilder.struct().name(schemaName)); for (Field field : schema.fields()) { builder.field(field.name(), preProcessSchema(field.schema())); structBuilder.field(field.name(), preProcessSchema(field.schema())); } return builder.build(); default: return structBuilder.build(); } default: { return schema; } } } private static SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder target) { if (source.isOptional()) { target.optional(); } if (source.defaultValue() != null) { target.defaultValue(source.defaultValue()); } return target; } // visible for testing static Object preProcessValue(Object value, Schema schema, Schema newSchema) { if (schema == null) { return value; } if (value == null) { if (schema.defaultValue() != null) { return schema.defaultValue(); } if (schema.isOptional()) { return null; } throw new DataException("null value for field that is required and has no default value"); } // Handle logical types String schemaName = schema.name(); Loading Loading
src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +42 −23 Original line number Diff line number Diff line Loading @@ -53,11 +53,13 @@ public class DataConverter { if (key == null) { throw new ConnectException("Key is used as document id and can not be null."); } Schema.Type schemaType; final Schema.Type schemaType; if (keySchema == null) { schemaType = ConnectSchema.schemaType(key.getClass()); if (schemaType == null) if (schemaType == null) { throw new DataException("Java class " + key.getClass() + " does not have corresponding schema type."); } } else { schemaType = keySchema.type(); } Loading Loading @@ -105,13 +107,11 @@ public class DataConverter { return null; } // Handle logical types SchemaBuilder builder; String schemaName = schema.name(); if (schemaName != null) { switch (schemaName) { case Decimal.LOGICAL_NAME: builder = SchemaBuilder.float64(); return builder.build(); return copySchemaBasics(schema, SchemaBuilder.float64()).build(); case Date.LOGICAL_NAME: case Time.LOGICAL_NAME: case Timestamp.LOGICAL_NAME: Loading @@ -120,39 +120,58 @@ public class DataConverter { } Schema.Type schemaType = schema.type(); Schema keySchema; Schema valueSchema; switch (schemaType) { case ARRAY: valueSchema = schema.valueSchema(); builder = SchemaBuilder.array(preProcessSchema(valueSchema)); return builder.build(); case MAP: keySchema = schema.keySchema(); valueSchema = schema.valueSchema(); case ARRAY: { return copySchemaBasics(schema, SchemaBuilder.array(preProcessSchema(schema.valueSchema()))).build(); } case MAP: { Schema keySchema = schema.keySchema(); Schema valueSchema = schema.valueSchema(); String keyName = keySchema.name() == null ? keySchema.type().name() : keySchema.name(); String valueName = valueSchema.name() == null ? valueSchema.type().name() : valueSchema.name(); builder = SchemaBuilder.array(SchemaBuilder.struct().name(keyName + "-" + valueName) Schema elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName) .field(MAP_KEY, preProcessSchema(keySchema)) .field(MAP_VALUE, preProcessSchema(valueSchema)) .build()); return builder.build(); case STRUCT: builder = SchemaBuilder.struct().name(schema.name()); .build(); return copySchemaBasics(schema, SchemaBuilder.array(elementSchema)).build(); } case STRUCT: { SchemaBuilder structBuilder = copySchemaBasics(schema, SchemaBuilder.struct().name(schemaName)); for (Field field : schema.fields()) { builder.field(field.name(), preProcessSchema(field.schema())); structBuilder.field(field.name(), preProcessSchema(field.schema())); } return builder.build(); default: return structBuilder.build(); } default: { return schema; } } } private static SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder target) { if (source.isOptional()) { target.optional(); } if (source.defaultValue() != null) { target.defaultValue(source.defaultValue()); } return target; } // visible for testing static Object preProcessValue(Object value, Schema schema, Schema newSchema) { if (schema == null) { return value; } if (value == null) { if (schema.defaultValue() != null) { return schema.defaultValue(); } if (schema.isOptional()) { return null; } throw new DataException("null value for field that is required and has no default value"); } // Handle logical types String schemaName = schema.name(); Loading