Commit 287e671e authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade soporte para procesar GeoJson

Añade fecha al topic para la rotación
Deserializa geometría como objeto en lugar de string
Formatea la fecha
Añade fechas inserted & updated
parent f0a8cf82
Loading
Loading
Loading
Loading
+453 −381
Original line number Diff line number Diff line
@@ -16,6 +16,21 @@

package io.confluent.connect.elasticsearch;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_VALUE;

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
@@ -31,25 +46,17 @@ import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.Converter;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_VALUE;
import com.fasterxml.jackson.databind.ObjectMapper;

public class DataConverter {

	private static final Converter JSON_CONVERTER;

	public static final String dateFormat = "yyyy-MM-dd'T'HH:mm:ss.SSSZZ";

	static {
		JSON_CONVERTER = new JsonConverter();
		JSON_CONVERTER.configure(Collections.singletonMap("schemas.enable", "false"), false);
@@ -59,20 +66,23 @@ public class DataConverter {
	private final BehaviorOnNullValues behaviorOnNullValues;

	/**
   * Create a DataConverter, specifying how map entries with string keys within record
   * values should be written to JSON. Compact map entries are written as
   * <code>"entryKey": "entryValue"</code>, while the non-compact form are written as a nested
   * document such as <code>{"key": "entryKey", "value": "entryValue"}</code>. All map entries
   * with non-string keys are always written as nested documents.
	 * Create a DataConverter, specifying how map entries with string keys within
	 * record values should be written to JSON. Compact map entries are written as
	 * <code>"entryKey": "entryValue"</code>, while the non-compact form are written
	 * as a nested document such as
	 * <code>{"key": "entryKey", "value": "entryValue"}</code>. All map entries with
	 * non-string keys are always written as nested documents.
	 *
   * @param useCompactMapEntries true for compact map entries with string keys, or false for
   *                             the nested document form.
   * @param behaviorOnNullValues behavior for handling records with null values; may not be null
	 * @param useCompactMapEntries
	 *            true for compact map entries with string keys, or false for the
	 *            nested document form.
	 * @param behaviorOnNullValues
	 *            behavior for handling records with null values; may not be null
	 */
	public DataConverter(boolean useCompactMapEntries, BehaviorOnNullValues behaviorOnNullValues) {
		this.useCompactMapEntries = useCompactMapEntries;
    this.behaviorOnNullValues =
        Objects.requireNonNull(behaviorOnNullValues, "behaviorOnNullValues cannot be null.");
		this.behaviorOnNullValues = Objects.requireNonNull(behaviorOnNullValues,
				"behaviorOnNullValues cannot be null.");
	}

	private String convertKey(Schema keySchema, Object key) {
@@ -84,11 +94,7 @@ public class DataConverter {
		if (keySchema == null) {
			schemaType = ConnectSchema.schemaType(key.getClass());
			if (schemaType == null) {
        throw new DataException(
            "Java class "
            + key.getClass()
            + " does not have corresponding schema type."
        );
				throw new DataException("Java class " + key.getClass() + " does not have corresponding schema type.");
			}
		} else {
			schemaType = keySchema.type();
@@ -106,87 +112,97 @@ public class DataConverter {
		}
	}

  public IndexableRecord convertRecord(
      SinkRecord record,
      String index,
      String type,
      boolean ignoreKey,
      boolean ignoreSchema
  ) {
	public IndexableRecord convertRecord(SinkRecord record, String index, String type, boolean ignoreKey,
			boolean ignoreSchema) {
		if (record.value() == null) {
			switch (behaviorOnNullValues) {
			case IGNORE:
				return null;
			case DELETE:
				if (record.key() == null) {
            // Since the record key is used as the ID of the index to delete and we don't have a key
					// Since the record key is used as the ID of the index to delete and we don't
					// have a key
					// for this record, we can't delete anything anyways, so we ignore the record.
            // We can also disregard the value of the ignoreKey parameter, since even if it's true
            // the resulting index we'd try to delete would be based solely off topic/partition/
            // offset information for the SinkRecord. Since that information is guaranteed to be
            // unique per message, we can be confident that there wouldn't be any corresponding
					// We can also disregard the value of the ignoreKey parameter, since even if
					// it's true
					// the resulting index we'd try to delete would be based solely off
					// topic/partition/
					// offset information for the SinkRecord. Since that information is guaranteed
					// to be
					// unique per message, we can be confident that there wouldn't be any
					// corresponding
					// index present in ES to delete anyways.
					return null;
				}
          // Will proceed as normal, ultimately creating an IndexableRecord with a null payload
				// Will proceed as normal, ultimately creating an IndexableRecord with a null
				// payload
				break;
			case FAIL:
				throw new DataException(String.format(
						"Sink record with key of %s and null value encountered for topic/partition/offset "
								+ "%s/%s/%s (to ignore future records like this change the configuration property "
								+ "'%s' from '%s' to '%s')",
              record.key(),
              record.topic(),
              record.kafkaPartition(),
              record.kafkaOffset(),
              ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG,
              BehaviorOnNullValues.FAIL,
              BehaviorOnNullValues.IGNORE
          ));
						record.key(), record.topic(), record.kafkaPartition(), record.kafkaOffset(),
						ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.FAIL,
						BehaviorOnNullValues.IGNORE));
			default:
          throw new RuntimeException(String.format(
              "Unknown value for %s enum: %s",
              BehaviorOnNullValues.class.getSimpleName(),
              behaviorOnNullValues
          ));
				throw new RuntimeException(String.format("Unknown value for %s enum: %s",
						BehaviorOnNullValues.class.getSimpleName(), behaviorOnNullValues));
			}
		}

		final String id;
		if (ignoreKey) {
      id = record.topic()
           + "+" + String.valueOf((int) record.kafkaPartition())
           + "+" + String.valueOf(record.kafkaOffset());
			id = record.topic() + "+" + String.valueOf((int) record.kafkaPartition()) + "+"
					+ String.valueOf(record.kafkaOffset());
		} else {
			id = convertKey(record.keySchema(), record.key());
		}

    final String payload = getPayload(record, ignoreSchema);
		final String payload = getPayload(record, ignoreSchema, index);
		final Long version = ignoreKey ? null : record.kafkaOffset();

		return new IndexableRecord(new Key(index, type, id), payload, version);
	}

  private String getPayload(SinkRecord record, boolean ignoreSchema) {
	private String getPayload(SinkRecord record, boolean ignoreSchema, String index) {
		if (record.value() == null) {
			return null;
		}

    Schema schema = ignoreSchema
        ? record.valueSchema()
        : preProcessSchema(record.valueSchema());
		Schema schema = ignoreSchema ? record.valueSchema() : preProcessSchema(record.valueSchema());

    Object value = ignoreSchema
        ? record.value()
        : preProcessValue(record.value(), record.valueSchema(), schema);
		Object value = ignoreSchema ? record.value() : preProcessValue(record.value(), record.valueSchema(), schema);

		if (value instanceof Struct)
			index = index + "-" + getRollOverSubfix(schema, value);

		byte[] rawJsonPayload = JSON_CONVERTER.fromConnectData(record.topic(), schema, value);
		return new String(rawJsonPayload, StandardCharsets.UTF_8);
	}

  // We need to pre process the Kafka Connect schema before converting to JSON as Elasticsearch
  // expects a different JSON format from the current JSON converter provides. Rather than
  // completely rewrite a converter for Elasticsearch, we will refactor the JSON converter to
  // support customized translation. The pre process is no longer needed once we have the JSON
	private String getRollOverSubfix(Schema schema, Object value) {

		Struct struct = (Struct) value;

		if (schema.schema().field("properties") == null)
			return "";

		String date = struct.getStruct("properties").getString("date");

		if (date == null || !date.contains("T"))
			return "";
		return date.split("T")[0];
	}

	// We need to pre process the Kafka Connect schema before converting to JSON as
	// Elasticsearch
	// expects a different JSON format from the current JSON converter provides.
	// Rather than
	// completely rewrite a converter for Elasticsearch, we will refactor the JSON
	// converter to
	// support customized translation. The pre process is no longer needed once we
	// have the JSON
	// converter refactored.
	// visible for testing
	Schema preProcessSchema(Schema schema) {
@@ -239,16 +255,31 @@ public class DataConverter {
			return copySchemaBasics(schema, result).build();
		}
		Schema elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName)
        .field(MAP_KEY, preprocessedKeySchema)
        .field(MAP_VALUE, preprocessedValueSchema)
        .build();
				.field(MAP_KEY, preprocessedKeySchema).field(MAP_VALUE, preprocessedValueSchema).build();
		return copySchemaBasics(schema, SchemaBuilder.array(elementSchema)).build();
	}

	private Schema preProcessStructSchema(Schema schema) {
		SchemaBuilder builder = copySchemaBasics(schema, SchemaBuilder.struct().name(schema.name()));
		for (Field field : schema.fields()) {
      builder.field(field.name(), preProcessSchema(field.schema()));
			Schema preProcessSchema;
			switch (field.name()) {
			case "geometry":
				preProcessSchema = SchemaBuilder.struct().field("type", Schema.STRING_SCHEMA)
						.field("coordinates", SchemaBuilder.array(Schema.FLOAT64_SCHEMA).schema()).build();
				break;
			case "date":
			case "inserted":
			case "updated":
				preProcessSchema = Schema.STRING_SCHEMA;
				break;
			default:
				preProcessSchema = preProcessSchema(field.schema());
				break;
			}

			builder.field(field.name(), preProcessSchema);

		}
		return builder.build();
	}
@@ -339,10 +370,8 @@ public class DataConverter {
		if (useCompactMapEntries && keySchema.type() == Schema.Type.STRING) {
			Map<Object, Object> processedMap = new HashMap<>();
			for (Map.Entry<?, ?> entry : map.entrySet()) {
        processedMap.put(
            preProcessValue(entry.getKey(), keySchema, newSchema.keySchema()),
            preProcessValue(entry.getValue(), valueSchema, newValueSchema)
        );
				processedMap.put(preProcessValue(entry.getKey(), keySchema, newSchema.keySchema()),
						preProcessValue(entry.getValue(), valueSchema, newValueSchema));
			}
			return processedMap;
		}
@@ -361,18 +390,60 @@ public class DataConverter {
	private Object preProcessStructValue(Object value, Schema schema, Schema newSchema) {
		Struct struct = (Struct) value;
		Struct newStruct = new Struct(newSchema);

		DateTime insertedDate = DateTime.now();

		for (Field field : schema.fields()) {
			Schema newFieldSchema = newSchema.field(field.name()).schema();
      Object converted = preProcessValue(struct.get(field), field.schema(), newFieldSchema);

			Object converted = null;

			switch (field.name()) {
			case "geometry":
				if (struct.getString(field.name()).contains("Point"))
					converted = getPointStructFromString(newSchema.field("geometry").schema(), struct.get(field));
				break;
			case "date":
				converted = new DateTime(struct.get(field), DateTimeZone.UTC).toString(dateFormat);
				break;
			case "inserted":
			case "updated":
				converted = insertedDate.toString(dateFormat);
				break;
			default:
				converted = preProcessValue(struct.get(field), field.schema(), newFieldSchema);
				break;
			}

			newStruct.put(field.name(), converted);
		}
		return newStruct;
	}

	@SuppressWarnings("unchecked")
	private Struct getPointStructFromString(Schema newSchema, Object value) {

		ObjectMapper mapper = new ObjectMapper();

		String coordinatesField = "coordinates";

		try {
			Map<String, Object> json = mapper.readValue(value.toString(), Map.class);

			Struct result = new Struct(newSchema.schema());
			result.put("type", "Point");
			result.put(coordinatesField, mapper.convertValue(json.get(coordinatesField), ArrayList.class));
			return result;

		} catch (IOException e) {
			e.printStackTrace();
		}

		return null;
	}

	public enum BehaviorOnNullValues {
    IGNORE,
    DELETE,
    FAIL;
		IGNORE, DELETE, FAIL;

		public static final BehaviorOnNullValues DEFAULT = IGNORE;

@@ -388,7 +459,8 @@ public class DataConverter {
				validator.ensureValid(name, value);
			}

      // Overridden here so that ConfigDef.toEnrichedRst shows possible values correctly
			// Overridden here so that ConfigDef.toEnrichedRst shows possible values
			// correctly
			@Override
			public String toString() {
				return validator.toString();
+309 −312

File changed.

Preview size limit exceeded, changes collapsed.