Commit 357a02e3 authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade control de dependencias de QC

Añade funcionalidad para controlar las dependencias entre parámetros a la
hora de pasar el control de calidad.
parent 5316571b
Loading
Loading
Loading
Loading
+107 −50
Original line number Diff line number Diff line
@@ -12,60 +12,90 @@ if (flowFile == null) {
	return;
}

// CONSTANTS
dateFormat = "yyyy-mm-dd'T'HH:mm:ss.SSS"
realTimeVFlag = "R"
goodQFlag = "1"
probablyBadQFlag = "3"
badQFlag = "4"
// CONSTANTS SCRIPT
DATE_FORMAT = "yyyy-mm-dd'T'HH:mm:ss.SSS"
REAL_TIME_VFLAG = "R"
DELAYED_VFLAG = "D"
GOOD_QFLAG = "1"
PROBABLY_BAD_QFLAG = "3"
BAD_QFLAG = "4"
QC_SUFFIX = "_qc"
// properties
DATA_DEFINITION_PROPERTY = "dataDefinition"
VALUE_PROPERTY = "value"
QFLAG_PROPERTY = "qFlag"
VFLAG_PROPERTY = "vFlag"
ID_PROPERTY = "id"
DATE_PROPERTY = "date"
// config
TRANSFORM_CONFIG_PROPERTY = "transform"
DECIMAL_PLACES_CONFIG_PROPERTY = "decimalPlaces"
QC_DEPENDENCIES_CONFIG_PROPERTY = "qCDependencies"
UPPER_LIMIT_CONFIG_PROPERTY = "upperLimit"
LOWER_LIMIT_CONFIG_PROPERTY = "lowerLimit"
UPPER_TOLERANCE_CONFIG_PROPERTY = "upperTolerance"
LOWER_TOLERANCE_CONFIG_PROPERTY = "lowerTolerance"
// common
ACTIVITY_ID_PROPERTY = "activityId"

// input Data

flowFile = session.write(flowFile, { inputStream, outputStream ->

	def content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
	def inJson = new JsonSlurper().parseText(content)
	def data = new JsonSlurper().parseText(content)

	// Read config from file
	// Obtiene el fichero de configuración en formato json
	def config = getConfigFile(flowFile)

	def sensors = inJson["sensors"]
	// Procesa y transforma los datos de los sensores en base al fichero de configuración
	processSensorData(data, config)

	outputStream.write(JsonOutput.toJson(data).toString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)

def processSensorData(data, config) {

	def sensors = data["sensors"]
	def delayedQC = data["qC"]
	def toRemove = []
	def qCFlags = [:]

	for (sensor in sensors) {

		def dataDefinition = sensor["dataDefinition"]
		def dataDefinition = sensor[DATA_DEFINITION_PROPERTY]
		def sensorConfig = config[dataDefinition]

		if (sensorConfig != null) {

			def value = sensor["value"]
			def value = sensor[VALUE_PROPERTY]

			// unit transform
			def transform = sensorConfig["transform"]
			def transform = sensorConfig[TRANSFORM_CONFIG_PROPERTY]
			if (transform != null) {
				value = transformValue(value, transform)
			}

			// trunc
			def decimalPlaces = sensorConfig["decimalPlaces"]
			def decimalPlaces = sensorConfig[DECIMAL_PLACES_CONFIG_PROPERTY]
			if (decimalPlaces != null) {
				value = truncValue(value, decimalPlaces)
			}

			// Set activity id from config
			sensor["activityId"] = config["activityId"]
			sensor[ACTIVITY_ID_PROPERTY] = config[ACTIVITY_ID_PROPERTY]

			sensor["value"] = value
			sensor[VALUE_PROPERTY] = value

			// qualityControl
			sensor["qFlag"] = qualityControl(value, sensorConfig)
			sensor["vFlag"] = realTimeVFlag
			qualityControl(sensor, delayedQC, dataDefinition, value, sensorConfig)

			sensor[DATA_DEFINITION_PROPERTY] = config[dataDefinition].dataDefinition

			sensor["dataDefinition"] = config[dataDefinition].dataDefinition
			// Add QFlag to new struct, index by dataDefinition
			qCFlags.put(sensor[DATA_DEFINITION_PROPERTY], sensor[QFLAG_PROPERTY])

			// Generate identifier
			sensor["id"] = generateIdentifier(config["activityId"], sensor["dataDefinition"], sensor["date"])
			sensor[ID_PROPERTY] = generateIdentifier(config[ACTIVITY_ID_PROPERTY], sensor[DATA_DEFINITION_PROPERTY], sensor[DATE_PROPERTY])
		}
		else {
			toRemove.add(sensor)
@@ -73,32 +103,12 @@ flowFile = session.write(flowFile, { inputStream, outputStream ->
	}

	sensors.removeAll { it in toRemove }
	outputStream.write(JsonOutput.toJson(inJson).toString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)

def getConfigFile(flowFile) {

	def attrName = "configFilePath"

	def attr = null
	// Read config from variables
	try {
		attr = binding.getVariable(attrName).evaluateAttributeExpressions(flowFile).value
	}
	catch(Exception ex) {}

	if (attr == null) // if is null, read config from attributes
		attr = flowFile.getAttribute(attrName)

	def configFile = new File(attr).getText("UTF-8")

	return new JsonSlurper().parseText(configFile)
	checkQCDependencies(sensors, qCFlags, config[QC_DEPENDENCIES_CONFIG_PROPERTY])
}

def generateIdentifier(activityId, dataDefinition, dateTime) {
	def time = Date.parse(dateFormat, dateTime)
	def time = Date.parse(DATE_FORMAT, dateTime)
	return activityId + "-" + dataDefinition + "-" + time.getTime();
}

@@ -110,21 +120,68 @@ def truncValue(double sensorValue, decimalPlaces) {
	return sensorValue.trunc(decimalPlaces)
}

def qualityControl(value, sensorConfig) {
def qualityControl(sensor, delayedQC, dataDefinition, value, sensorConfig) {

	// qualityControl
	def delayedQFlag = (delayedQC != null ? delayedQC[dataDefinition + QC_SUFFIX] : null)
	if (delayedQFlag == null) { // Si no llega con control de calidad
		sensor[QFLAG_PROPERTY] = getQFlag(value, sensorConfig)
		sensor[VFLAG_PROPERTY] = REAL_TIME_VFLAG
	} else {
		sensor[VFLAG_PROPERTY] = DELAYED_VFLAG
		if (delayedQFlag == GOOD_QFLAG) { // Si el valor es considerado bueno se vuelve a pasar el control
			sensor[QFLAG_PROPERTY] = getQFlag(value, sensorConfig)
		} else { // Si el valor no es considerado bueno se respeta el control de calidad recibido
			sensor[QFLAG_PROPERTY] = delayedQFlag
		}
	}
}

def getQFlag(value, sensorConfig) {

	def upperLimit = sensorConfig["upperLimit"]
	def lowerLimit = sensorConfig["lowerLimit"]
	def upperTolerance = sensorConfig["upperTolerance"]
	def lowerTolerance = sensorConfig["lowerTolerance"]
	def upperLimit = sensorConfig[UPPER_LIMIT_CONFIG_PROPERTY]
	def lowerLimit = sensorConfig[LOWER_LIMIT_CONFIG_PROPERTY]
	def upperTolerance = sensorConfig[UPPER_TOLERANCE_CONFIG_PROPERTY]
	def lowerTolerance = sensorConfig[LOWER_TOLERANCE_CONFIG_PROPERTY]

	def qFlag = badQFlag
	def qFlag = BAD_QFLAG

	if ((lowerLimit <=  value) && (value <= upperLimit)) {
		qFlag = goodQFlag
		qFlag = GOOD_QFLAG
	}
	else if ((value >= lowerLimit - lowerTolerance) || ( value <= upperLimit + upperTolerance )) {
		qFlag = probablyBadQFlag
		qFlag = PROBABLY_BAD_QFLAG
	}

	return qFlag;
}


def checkQCDependencies(sensors, qCFlags, qCDependencies) {

	for (sensor in sensors) {
		def dd = sensor[DATA_DEFINITION_PROPERTY]
		if (qCDependencies.containsKey(dd)) {

			def dependencyQflags = qCDependencies.get(dd)
			String qFlag = sensor[QFLAG_PROPERTY]

			for (dependency in dependencyQflags) {
				String dependencyQFlag = qCFlags.get(dependency)
				qFlag = getRealQFlag(qFlag, dependencyQFlag)
			}
			sensor[QFLAG_PROPERTY] = qFlag
		}
	}
}

def getRealQFlag(source, dependency) {

	if (BAD_QFLAG.equals(source) || BAD_QFLAG.equals(dependency)) {
		return BAD_QFLAG
	}
	if (PROBABLY_BAD_QFLAG.equals(source) || PROBABLY_BAD_QFLAG.equals(dependency)) {
		return PROBABLY_BAD_QFLAG
	}
	return source;
}
 No newline at end of file