Loading src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +9 −2 Original line number Diff line number Diff line Loading @@ -175,6 +175,8 @@ public class DataConverter { index = index + "-" + indexSuffix; } System.out.println(".---------- Guardando en " + index); final Long version = ignoreKey ? null : 0L;// record.kafkaOffset(); return new IndexableRecord(new Key(index, type, id), payload, version); Loading Loading @@ -204,10 +206,15 @@ public class DataConverter { Struct struct = (Struct) value; if (schema.schema().field("properties") == null) if (schema.schema().field("date") == null && schema.schema().field("properties") == null) return null; String date = struct.getStruct("properties").getString("date").toString(); String date = null; if (schema.schema().field("date") != null) date = struct.getString("date").toString(); else if (schema.schema().field("properties") != null) date = struct.getStruct("properties").getString("date").toString(); if (date == null || !date.contains("T")) return ""; Loading src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java +26 −0 Original line number Diff line number Diff line Loading @@ -261,6 +261,32 @@ public class DataConverterTest { assertEquals(expected, result); } @Test public void timeseries() { Schema origSchema = SchemaBuilder.struct().field("date", Schema.INT64_SCHEMA) .field("inserted", Schema.OPTIONAL_FLOAT64_SCHEMA).field("updated", Schema.OPTIONAL_FLOAT64_SCHEMA) .build(); Schema preProcessedSchema = converter.preProcessSchema(origSchema); assertNotEquals(origSchema, preProcessedSchema); Struct result = (Struct) converter.preProcessValue(new Struct(origSchema).put("date", 1545986513L), origSchema, preProcessedSchema); Struct expected = new Struct(preProcessedSchema) .put("date", new DateTime(1545986513, DateTimeZone.UTC).toString(DataConverter.dateFormat)) .put("inserted", result.getString("inserted")).put("updated", result.getString("updated")); SinkRecord sinkRecord = new SinkRecord(topic, partition, Schema.STRING_SCHEMA, key, preProcessedSchema, expected, offset); IndexableRecord actualRecord = converter.convertRecord(sinkRecord, index, type, false, false); assertEquals(expected, result); } @Test public void optionalFieldsWithoutDefaults() { // One primitive type should be enough Loading Loading
src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +9 −2 Original line number Diff line number Diff line Loading @@ -175,6 +175,8 @@ public class DataConverter { index = index + "-" + indexSuffix; } System.out.println(".---------- Guardando en " + index); final Long version = ignoreKey ? null : 0L;// record.kafkaOffset(); return new IndexableRecord(new Key(index, type, id), payload, version); Loading Loading @@ -204,10 +206,15 @@ public class DataConverter { Struct struct = (Struct) value; if (schema.schema().field("properties") == null) if (schema.schema().field("date") == null && schema.schema().field("properties") == null) return null; String date = struct.getStruct("properties").getString("date").toString(); String date = null; if (schema.schema().field("date") != null) date = struct.getString("date").toString(); else if (schema.schema().field("properties") != null) date = struct.getStruct("properties").getString("date").toString(); if (date == null || !date.contains("T")) return ""; Loading
src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java +26 −0 Original line number Diff line number Diff line Loading @@ -261,6 +261,32 @@ public class DataConverterTest { assertEquals(expected, result); } @Test public void timeseries() { Schema origSchema = SchemaBuilder.struct().field("date", Schema.INT64_SCHEMA) .field("inserted", Schema.OPTIONAL_FLOAT64_SCHEMA).field("updated", Schema.OPTIONAL_FLOAT64_SCHEMA) .build(); Schema preProcessedSchema = converter.preProcessSchema(origSchema); assertNotEquals(origSchema, preProcessedSchema); Struct result = (Struct) converter.preProcessValue(new Struct(origSchema).put("date", 1545986513L), origSchema, preProcessedSchema); Struct expected = new Struct(preProcessedSchema) .put("date", new DateTime(1545986513, DateTimeZone.UTC).toString(DataConverter.dateFormat)) .put("inserted", result.getString("inserted")).put("updated", result.getString("updated")); SinkRecord sinkRecord = new SinkRecord(topic, partition, Schema.STRING_SCHEMA, key, preProcessedSchema, expected, offset); IndexableRecord actualRecord = converter.convertRecord(sinkRecord, index, type, false, false); assertEquals(expected, result); } @Test public void optionalFieldsWithoutDefaults() { // One primitive type should be enough Loading