Commit 91941517 authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade algoritmo para descartar datos ya procesados

De un fichero a otro vienen datos repetidos (por la forma en que trabaja
aishub). Mediante este algoritmo, se descartan datos de fechas ya
procesadas.

Añade test
parent bb4e8b17
Loading
Loading
Loading
Loading
+29 −1
Original line number Diff line number Diff line
@@ -74,6 +74,13 @@ public class AISService {
	@Autowired
	private KafkaTemplate<String, CommonDTO> vesselTemplate;

	// @formatter:off
	
	private long maxDateBefore = -1,
			maxDateCurrent = -1;
	
	// @formatter:on

	public void fetchData() {

		prepareFile();
@@ -120,6 +127,9 @@ public class AISService {
			processRow(row);
		}

		maxDateBefore = maxDateCurrent;
		maxDateCurrent = -1;

		file.delete();
	}

@@ -144,8 +154,26 @@ public class AISService {

		dto.buildFromMap(row);

		if (dataFulfillConstraints(dto)) {
			publishToKafka(dto);
		}
	}

	private boolean dataFulfillConstraints(AISTrackingDTO dto) {

		if (dto.getMmsi() == null && dto.getTstamp() == null) {
			return false;
		}

		if (dto.getTstamp().getMillis() < maxDateBefore) {
			return false;
		}

		if (dto.getTstamp().getMillis() > maxDateCurrent) {
			maxDateCurrent = dto.getTstamp().getMillis();
		}
		return true;
	}

	private void publishToKafka(AISTrackingDTO aisTracking) {

+24 −10
Original line number Diff line number Diff line
@@ -39,7 +39,7 @@ import es.redmic.vesselslib.dto.vessel.VesselDTO;
@SpringBootTest(classes = { AISApplication.class })
@ActiveProfiles("test")
@TestPropertySource(properties = { "schema.registry.port=18081" })
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
@DirtiesContext(classMode = ClassMode.BEFORE_EACH_TEST_METHOD)
public class AISServiceTest extends KafkaBaseIntegrationTest {

	@Autowired
@@ -82,26 +82,40 @@ public class AISServiceTest extends KafkaBaseIntegrationTest {
	@Test
	public void processFile_ShouldSendMessageToKafka_IfDataIsCorrect() throws Exception {

		// Coloca el primer fichero de test en el lugar correspondiente
		File srcFile = new File("src/test/resources/ais.zip");

		File destFile = new File(directoryPath + "/ais.zip");

		FileUtils.copyFile(srcFile, destFile);

		// Invoca los métodos como si se hubiera ejecutado desde el schedule
		Whitebox.invokeMethod(aisService, "unzip");
		Whitebox.invokeMethod(aisService, "processFile");

		// Coloca el segundo fichero de test en el lugar correspondiente
		File srcFile2 = new File("src/test/resources/ais2.zip");
		File destFile2 = new File(directoryPath + "/ais2.zip");
		FileUtils.copyFile(srcFile2, destFile2);

		// Cambia nombre dentro del servicio e invoca los métodos de nuevo para procesar
		// el segundo fichero
		Whitebox.setInternalState(aisService, "nameCompressFile", "ais2.zip");
		Whitebox.invokeMethod(aisService, "unzip");
		Whitebox.invokeMethod(aisService, "processFile");

		Thread.sleep(500);
		// Espera un tiempo para que pueda terminar de procesar los dos ficheros
		Thread.sleep(23000);

		int numOfItems = 41998; // Debería procesar 41976 pero repite 36 elementos que llegan en el segundo
								// fichero con el mismo tstamp

		// Espera que se publiquen 23016 registros al topic de vesselTracking
		assertEquals(23016, blockingQueueVesselTracking.size());
		// Espera que se publiquen numOfItems registros al topic de vesselTracking
		assertEquals(numOfItems, blockingQueueVesselTracking.size());

		// Espera que se publiquen 23016 registros al topic de ais
		assertEquals(23016, blockingQueueAIS.size());
		// Espera que se publiquen numOfItems registros al topic de ais
		assertEquals(numOfItems, blockingQueueAIS.size());

		// Espera que se publiquen 23016 registros al topic de vessel
		assertEquals(23016, blockingQueueVessel.size());
		// Espera que se publiquen numOfItems registros al topic de vessel
		assertEquals(numOfItems, blockingQueueVessel.size());
	}

	@Test(expected = InvalidUsernameException.class)