Loading src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +13 −6 Original line number Diff line number Diff line Loading @@ -48,6 +48,7 @@ import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.storage.Converter; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import com.fasterxml.jackson.databind.ObjectMapper; Loading @@ -67,6 +68,9 @@ public class DataConverter { private final boolean useCompactMapEntries; private final BehaviorOnNullValues behaviorOnNullValues; // TODO: pasar en la config del conector private final String rollOverSuffixPattern = "yyyy-MM"; /** * Create a DataConverter, specifying how map entries with string keys within * record values should be written to JSON. Compact map entries are written as Loading Loading @@ -191,7 +195,7 @@ public class DataConverter { Object value = ignoreSchema ? record.value() : preProcessValue(record.value(), record.valueSchema(), schema); if (value instanceof Struct) if (value instanceof Struct) // Comprobar si existe config para activar rollover result.put(INDEX_SUFFIX_KEY, getRollOverSuffix(schema, value)); byte[] rawJsonPayload = JSON_CONVERTER.fromConnectData(record.topic(), schema, value); Loading @@ -207,16 +211,19 @@ public class DataConverter { if (schema.schema().field("date") == null && schema.schema().field("properties") == null) return null; String date = null; String dateTimeString = null; if (schema.schema().field("date") != null) date = struct.getString("date").toString(); dateTimeString = struct.getString("date").toString(); else if (schema.schema().field("properties") != null) date = struct.getStruct("properties").getString("date").toString(); dateTimeString = struct.getStruct("properties").getString("date").toString(); if (date == null || !date.contains("T")) if (dateTimeString == null) return ""; return date.split("T")[0]; DateTime dateTime = DateTimeFormat.forPattern(dateFormat).parseDateTime(dateTimeString); return dateTime.toString(DateTimeFormat.forPattern(rollOverSuffixPattern)); } // We need to pre process the Kafka Connect schema before converting to JSON as Loading src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java +12 −2 Original line number Diff line number Diff line Loading @@ -39,6 +39,7 @@ import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.junit.Before; import org.junit.Test; Loading Loading @@ -275,8 +276,9 @@ public class DataConverterTest { Struct result = (Struct) converter.preProcessValue(new Struct(origSchema).put("date", 1545986513L), origSchema, preProcessedSchema); Struct expected = new Struct(preProcessedSchema) .put("date", new DateTime(1545986513, DateTimeZone.UTC).toString(DataConverter.dateFormat)) DateTime dateTime = new DateTime(1545986513, DateTimeZone.UTC); Struct expected = new Struct(preProcessedSchema).put("date", dateTime.toString(DataConverter.dateFormat)) .put("inserted", result.getString("inserted")).put("updated", result.getString("updated")); SinkRecord sinkRecord = new SinkRecord(topic, partition, Schema.STRING_SCHEMA, key, preProcessedSchema, Loading @@ -284,6 +286,14 @@ public class DataConverterTest { IndexableRecord actualRecord = converter.convertRecord(sinkRecord, index, type, false, false); String indexActualRecord = actualRecord.key.index; // TODO: pasar en la config del conector String rollOverSuffixPattern = "yyyy-MM"; String indexExpected = index + "-" + dateTime.toString(DateTimeFormat.forPattern(rollOverSuffixPattern)); assertEquals(indexExpected, indexActualRecord); assertEquals(expected, result); } Loading Loading
src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +13 −6 Original line number Diff line number Diff line Loading @@ -48,6 +48,7 @@ import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.storage.Converter; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import com.fasterxml.jackson.databind.ObjectMapper; Loading @@ -67,6 +68,9 @@ public class DataConverter { private final boolean useCompactMapEntries; private final BehaviorOnNullValues behaviorOnNullValues; // TODO: pasar en la config del conector private final String rollOverSuffixPattern = "yyyy-MM"; /** * Create a DataConverter, specifying how map entries with string keys within * record values should be written to JSON. Compact map entries are written as Loading Loading @@ -191,7 +195,7 @@ public class DataConverter { Object value = ignoreSchema ? record.value() : preProcessValue(record.value(), record.valueSchema(), schema); if (value instanceof Struct) if (value instanceof Struct) // Comprobar si existe config para activar rollover result.put(INDEX_SUFFIX_KEY, getRollOverSuffix(schema, value)); byte[] rawJsonPayload = JSON_CONVERTER.fromConnectData(record.topic(), schema, value); Loading @@ -207,16 +211,19 @@ public class DataConverter { if (schema.schema().field("date") == null && schema.schema().field("properties") == null) return null; String date = null; String dateTimeString = null; if (schema.schema().field("date") != null) date = struct.getString("date").toString(); dateTimeString = struct.getString("date").toString(); else if (schema.schema().field("properties") != null) date = struct.getStruct("properties").getString("date").toString(); dateTimeString = struct.getStruct("properties").getString("date").toString(); if (date == null || !date.contains("T")) if (dateTimeString == null) return ""; return date.split("T")[0]; DateTime dateTime = DateTimeFormat.forPattern(dateFormat).parseDateTime(dateTimeString); return dateTime.toString(DateTimeFormat.forPattern(rollOverSuffixPattern)); } // We need to pre process the Kafka Connect schema before converting to JSON as Loading
src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java +12 −2 Original line number Diff line number Diff line Loading @@ -39,6 +39,7 @@ import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.junit.Before; import org.junit.Test; Loading Loading @@ -275,8 +276,9 @@ public class DataConverterTest { Struct result = (Struct) converter.preProcessValue(new Struct(origSchema).put("date", 1545986513L), origSchema, preProcessedSchema); Struct expected = new Struct(preProcessedSchema) .put("date", new DateTime(1545986513, DateTimeZone.UTC).toString(DataConverter.dateFormat)) DateTime dateTime = new DateTime(1545986513, DateTimeZone.UTC); Struct expected = new Struct(preProcessedSchema).put("date", dateTime.toString(DataConverter.dateFormat)) .put("inserted", result.getString("inserted")).put("updated", result.getString("updated")); SinkRecord sinkRecord = new SinkRecord(topic, partition, Schema.STRING_SCHEMA, key, preProcessedSchema, Loading @@ -284,6 +286,14 @@ public class DataConverterTest { IndexableRecord actualRecord = converter.convertRecord(sinkRecord, index, type, false, false); String indexActualRecord = actualRecord.key.index; // TODO: pasar en la config del conector String rollOverSuffixPattern = "yyyy-MM"; String indexExpected = index + "-" + dateTime.toString(DateTimeFormat.forPattern(rollOverSuffixPattern)); assertEquals(indexExpected, indexActualRecord); assertEquals(expected, result); } Loading