Commit b44ba900 authored by Noel Alonso's avatar Noel Alonso
Browse files

Evita el uso de SpecificRecord

De esta manera se evitan problemas por el uso de clases no importadas
parent 1665876f
Loading
Loading
Loading
Loading
+18 −20
Original line number Diff line number Diff line
@@ -8,7 +8,6 @@ import java.util.Map;
import java.util.Properties;

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
@@ -76,15 +75,15 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase
		HashMapSerde<String, GenericRecord> hashMapSerde = new HashMapSerde<String, GenericRecord>(schemaRegistryClient,
				schemaRegistryUrl);

		KStream<String, SpecificRecord> areasStream = builder.stream(areasTopic,
				Consumed.with(null, getSpecificAvroSerde()));
		KStream<String, GenericRecord> areasStream = builder.stream(areasTopic,
				Consumed.with(null, getGenericAvroSerde()));

		KStream<String, SpecificRecord> pointsStream = builder.stream(pointsTopic,
				Consumed.with(null, getSpecificAvroSerde()));
		KStream<String, GenericRecord> pointsStream = builder.stream(pointsTopic,
				Consumed.with(null, getGenericAvroSerde()));

		KStream<String, SpecificRecord> lastAreasKStream = areasStream
				.groupByKey(Serialized.with(null, getSpecificAvroSerde()))
				.reduce((aggValue, newValue) -> newValue, Materialized.with(null, getSpecificAvroSerde())).toStream();
		KStream<String, GenericRecord> lastAreasKStream = areasStream
				.groupByKey(Serialized.with(null, getGenericAvroSerde()))
				.reduce((aggValue, newValue) -> newValue, Materialized.with(null, getGenericAvroSerde())).toStream();

		KStream<String, GenericRecord> areasKStreamEnriched = lastAreasKStream
				.flatMapValues((value) -> enrichAreaWithGeoHash(value)).selectKey((key, value) -> getGeoHashKey(value));
@@ -113,7 +112,7 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase
		return v.get(GEO_HASH_KEY).toString();
	}

	private GenericRecord enrichPointWithGeoHash(SpecificRecord value) {
	private GenericRecord enrichPointWithGeoHash(GenericRecord value) {

		GenericRecord avroRecord = AvroUtils.getGenericRecordFromClass(SimplePoint.class);

@@ -134,16 +133,16 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase
			return null;
		}
		// Se crea un nuevo registro con el geohash code y solo con la info necesaria
		avroRecord.put("mmsi", AvroUtils.getSpecificRecordProperty(value, "mmsi").toString());
		avroRecord.put("name", AvroUtils.getSpecificRecordProperty(value, "name").toString());
		avroRecord.put("dateTime", AvroUtils.getSpecificRecordProperty(value, "tstamp"));
		avroRecord.put("vesselType", AvroUtils.getSpecificRecordProperty(value, "type"));
		avroRecord.put("sog", AvroUtils.getSpecificRecordProperty(value, "sog"));
		avroRecord.put("mmsi", value.get("mmsi").toString());
		avroRecord.put("name", value.get("name").toString());
		avroRecord.put("dateTime", value.get("tstamp"));
		avroRecord.put("vesselType", value.get("type"));
		avroRecord.put("sog", value.get("sog"));

		return avroRecord;
	}

	private List<GenericRecord> enrichAreaWithGeoHash(SpecificRecord value) {
	private List<GenericRecord> enrichAreaWithGeoHash(GenericRecord value) {

		String geometry;
		try {
@@ -163,11 +162,10 @@ public class VesselRestrictionCheckerApplication extends StreamsApplicationBase
					// Se crean nuevos registros con el geohash code y solo con la info necesaria
					GenericRecord avroRecord = AvroUtils.getGenericRecordFromClass(SimpleArea.class);
					avroRecord.put(RESULT_GEOMETRY_PROPERTY, geometry.toString());
					avroRecord.put("id", AvroUtils.getSpecificRecordProperty(value, "id").toString());
					avroRecord.put("name", AvroUtils.getSpecificRecordProperty(value, "name").toString());
					avroRecord.put(VESSEL_TYPES_RESTRICTED_PROPERTY,
							AvroUtils.getSpecificRecordProperty(value, VESSEL_TYPES_RESTRICTED_PROPERTY));
					avroRecord.put(MAX_SPEED_PROPERTY, AvroUtils.getSpecificRecordProperty(value, MAX_SPEED_PROPERTY));
					avroRecord.put("id", value.get("id").toString());
					avroRecord.put("name", value.get("name").toString());
					avroRecord.put(VESSEL_TYPES_RESTRICTED_PROPERTY, value.get(VESSEL_TYPES_RESTRICTED_PROPERTY));
					avroRecord.put(MAX_SPEED_PROPERTY, value.get(MAX_SPEED_PROPERTY));
					avroRecord.put(GEO_HASH_KEY, geoHash);
					values.add(avroRecord);
				}
+0 −6
Original line number Diff line number Diff line
@@ -7,15 +7,9 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.specific.SpecificRecord;

public class AvroUtils {

	public static Object getSpecificRecordProperty(SpecificRecord record, String name) {

		return record.get(record.getSchema().getField(name).pos());
	}

	public static HashMap<String, GenericRecord> aggregateGenericRecordInMap(String k, GenericRecord v,
			HashMap<String, GenericRecord> map, String aggregateKey) {
		map.put(v.get(aggregateKey).toString(), v);
+8 −7
Original line number Diff line number Diff line
@@ -5,7 +5,7 @@ import java.util.ArrayList;
import java.util.List;

import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.generic.GenericRecord;
import org.apache.lucene.spatial.prefix.tree.CellIterator;
import org.apache.lucene.spatial.prefix.tree.GeohashPrefixTree;
import org.locationtech.spatial4j.context.SpatialContext;
@@ -93,7 +93,7 @@ public class GeoUtils {
	 * @throws InvalidShapeException
	 * @throws ParseException
	 */
	public static String getWKTGeometry(SpecificRecord value) throws InvalidShapeException, ParseException {
	public static String getWKTGeometry(GenericRecord value) throws InvalidShapeException, ParseException {

		// Comprueba si la geometría viene en los diferentes formatos soportados.

@@ -101,21 +101,22 @@ public class GeoUtils {

		if (schema.getField("longitude") != null && schema.getField("latitude") != null) {

			Object lon = AvroUtils.getSpecificRecordProperty(value, "longitude");
			Object lat = AvroUtils.getSpecificRecordProperty(value, "latitude");
			Object lon = value.get("longitude");
			Object lat = value.get("latitude");

			return getWKTFromLatLon((double) lat, (double) lon);
		}
		if (schema.getField("x") != null && schema.getField("y") != null) {

			Object lon = AvroUtils.getSpecificRecordProperty(value, "x");
			Object lat = AvroUtils.getSpecificRecordProperty(value, "y");
			Object lon = value.get("x");
			Object lat = value.get("y");

			return getWKTFromLatLon((double) lat, (double) lon);
		}
		// TODO: añadir compatibilidad con GeoJson
		if (schema.getField("geometry") != null) {

			return (String) AvroUtils.getSpecificRecordProperty(value, "geometry");
			return value.get("geometry").toString();
		}
		return null;
	}