Commit e86f86ab authored by Noel Alonso's avatar Noel Alonso
Browse files

No crea el mapping si el index no es el original

Si el index ha sido modificado significa que se crea en base al campo
date. Para ello es indispensable tener un template aplicado para estos
íncides, por lo que no se debe crear el mapping.
parent 287e671e
Loading
Loading
Loading
Loading
+27 −7
Original line number Diff line number Diff line
@@ -55,6 +55,8 @@ public class DataConverter {

	private static final Converter JSON_CONVERTER;

	private final String PAYLOAD_KEY = "payload", INDEX_SUFFIX_KEY = "indexSuffix";

	public static final String dateFormat = "yyyy-MM-dd'T'HH:mm:ss.SSSZZ";

	static {
@@ -159,36 +161,54 @@ public class DataConverter {
			id = convertKey(record.keySchema(), record.key());
		}

		final String payload = getPayload(record, ignoreSchema, index);
		HashMap<String, String> result = getPayloadAndIndexSuffix(record, ignoreSchema);

		String payload = null;

		if (result != null && result.containsKey(PAYLOAD_KEY)) {
			payload = result.get(PAYLOAD_KEY);
		}

		if (result != null && result.containsKey(INDEX_SUFFIX_KEY)) {
			String indexSuffix = result.get(INDEX_SUFFIX_KEY);
			if (indexSuffix != null)
				index = index + "-" + indexSuffix;
		}

		final Long version = ignoreKey ? null : record.kafkaOffset();

		System.out.println(".............. " + index + " " + type + " " + id);
		return new IndexableRecord(new Key(index, type, id), payload, version);
	}

	private String getPayload(SinkRecord record, boolean ignoreSchema, String index) {
	private HashMap<String, String> getPayloadAndIndexSuffix(SinkRecord record, boolean ignoreSchema) {
		if (record.value() == null) {
			return null;
		}

		HashMap<String, String> result = new HashMap<String, String>();

		Schema schema = ignoreSchema ? record.valueSchema() : preProcessSchema(record.valueSchema());

		Object value = ignoreSchema ? record.value() : preProcessValue(record.value(), record.valueSchema(), schema);

		if (value instanceof Struct)
			index = index + "-" + getRollOverSubfix(schema, value);
			result.put(INDEX_SUFFIX_KEY, getRollOverSuffix(schema, value));

		byte[] rawJsonPayload = JSON_CONVERTER.fromConnectData(record.topic(), schema, value);
		return new String(rawJsonPayload, StandardCharsets.UTF_8);
		result.put(PAYLOAD_KEY, new String(rawJsonPayload, StandardCharsets.UTF_8));

		return result;
	}

	private String getRollOverSubfix(Schema schema, Object value) {
	private String getRollOverSuffix(Schema schema, Object value) {

		Struct struct = (Struct) value;

		if (schema.schema().field("properties") == null)
			return "";
			return null;

		String date = struct.getStruct("properties").getString("date");
		String date = struct.getStruct("properties").getString("date").toString();

		if (date == null || !date.contains("T"))
			return "";
+14 −13
Original line number Diff line number Diff line
@@ -254,19 +254,6 @@ public class ElasticsearchWriter {
      final boolean ignoreSchema =
          ignoreSchemaTopics.contains(sinkRecord.topic()) || this.ignoreSchema;

      if (!ignoreSchema && !existingMappings.contains(index)) {
        try {
          if (Mapping.getMapping(client, index, type) == null) {
            Mapping.createMapping(client, index, type, sinkRecord.valueSchema());
          }
        } catch (IOException e) {
          // FIXME: concurrent tasks could attempt to create the mapping and one of the requests may
          // fail
          throw new ConnectException("Failed to initialize mapping for index: " + index, e);
        }
        existingMappings.add(index);
      }

      tryWriteRecord(sinkRecord, index, ignoreKey, ignoreSchema);
    }
  }
@@ -288,6 +275,20 @@ public class ElasticsearchWriter {
          type,
          ignoreKey,
          ignoreSchema);
      
      if (record != null && record.key != null && index.equals(record.key.index) && !ignoreSchema && !existingMappings.contains(record.key.index)) {
          try {
            if (Mapping.getMapping(client, record.key.index, type) == null) {
              Mapping.createMapping(client, record.key.index, type, sinkRecord.valueSchema());
            }
          } catch (IOException e) {
            // FIXME: concurrent tasks could attempt to create the mapping and one of the requests may
            // fail
            throw new ConnectException("Failed to initialize mapping for index: " + record.key.index, e);
          }
          existingMappings.add(record.key.index);
      }
      
      if (record != null) {
        bulkProcessor.add(record, flushTimeoutMs);
      }