Commit a8f0fbfe authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade filtrado por bbox

De esta manera solo se procesan los tracks que estén dentro de este bbox
Al topic de ais (datos brutos) si se envían todos los tracks
Se adapta el test
parent ca13ddb7
Loading
Loading
Loading
Loading
+39 −7
Original line number Diff line number Diff line
@@ -6,9 +6,13 @@ import java.net.URL;
import java.util.List;
import java.util.Map;

import javax.annotation.PostConstruct;

import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.Envelope;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
@@ -74,6 +78,26 @@ public class AISService {
	@Autowired
	private KafkaTemplate<String, CommonDTO> vesselTemplate;

	@Value("${bboxFilter.bottomRightLat}")
	private double bottomRightLat; // = 26

	@Value("${bboxFilter.bottomRightLon}")
	private double bottomRightLon; // = -10,

	@Value("${bboxFilter.topLeftLat}")
	private double topLeftLat; // = 30,

	@Value("${bboxFilter.topLeftLon}")
	private double topLeftLon; // = -21;

	Envelope envelopeJts;

	@PostConstruct
	private void aisServicePostConstruct() {

		envelopeJts = new Envelope(bottomRightLon, topLeftLon, topLeftLat, bottomRightLat);
	}

	// @formatter:off
	
	private long maxDateBefore = -1,
@@ -185,6 +209,9 @@ public class AISService {
		// Envía dto de datos brutos para sink de postgresql
		aisTemplate.send(AIS_TOPIC, vesselId, aisTracking).addCallback(new SendListener());

		// Si el punto está en la zona de interés
		if (pointInBbox(aisTracking.getLongitude(), aisTracking.getLatitude())) {

			VesselTrackingDTO tracking = VesselTrackingUtil.convertTrackToVesselTracking(aisTracking, QFLAG_DEFAULT,
					VFLAG_DEFAULT, activityId);

@@ -197,6 +224,11 @@ public class AISService {
			// Envía dto de vessel para procesarlo
			vesselTemplate.send(VESSEL_TOPIC, vesselId, vessel).addCallback(new SendListener());
		}
	}

	private boolean pointInBbox(Double x, Double y) {
		return envelopeJts.contains(new Coordinate(x, y));
	}

	private void removeZipFile() {

+6 −0
Original line number Diff line number Diff line
@@ -24,6 +24,12 @@ vesseltracking-activity-id=999
qflag.default=0
vflag.default=N

#bbox filter
bboxFilter.bottomRightLat=26
bboxFilter.bottomRightLon=-10
bboxFilter.topLeftLat=30
bboxFilter.topLeftLon=-21

aishub.service.username=aishub.service.username
aishub.service.url=${aishub.service.url}

+11 −6
Original line number Diff line number Diff line
@@ -105,17 +105,22 @@ public class AISServiceTest extends KafkaBaseIntegrationTest {
		// Espera un tiempo para que pueda terminar de procesar los dos ficheros
		Thread.sleep(23000);

		int numOfItems = 41998; // Debería procesar 41976 pero repite 36 elementos que llegan en el segundo
								// fichero con el mismo tstamp
		// @formatter:off

		int numOfItems = 41998, // Debería procesar 41976 pero repite 36 elementos que llegan en el segundo
				numOfItemsInBbox = 86; 
		// @formatter:on

		// Espera que se publiquen numOfItems registros al topic de vesselTracking
		assertEquals(numOfItems, blockingQueueVesselTracking.size());
		// fichero con el mismo tstamp

		// Espera que se publiquen numOfItems registros al topic de ais
		assertEquals(numOfItems, blockingQueueAIS.size());

		// Espera que se publiquen numOfItems registros al topic de vessel
		assertEquals(numOfItems, blockingQueueVessel.size());
		// Espera que se publiquen numOfItemsInBbox registros al topic de vesselTracking
		assertEquals(numOfItemsInBbox, blockingQueueVesselTracking.size());

		// Espera que se publiquen numOfItemsInBbox registros al topic de vessel
		assertEquals(numOfItemsInBbox, blockingQueueVessel.size());
	}

	@Test(expected = InvalidUsernameException.class)