Commit 3485c9c1 authored by Noel Alonso's avatar Noel Alonso
Browse files

Separa en envío de datos de ais en tres topics

De este modo se envía los datos ya transformados al topic
correspondiente.

Adapta test
parent 989e3a58
Loading
Loading
Loading
Loading
+45 −12
Original line number Diff line number Diff line
@@ -12,16 +12,19 @@ import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import es.redmic.ais.exceptions.InvalidUsernameException;
import es.redmic.brokerlib.avro.geodata.tracking.vessels.AISTrackingDTO;
import es.redmic.brokerlib.avro.common.CommonDTO;
import es.redmic.brokerlib.listener.SendListener;
import es.redmic.exception.custom.ResourceNotFoundException;
import es.redmic.utils.compressor.Zip;
import es.redmic.utils.csv.DataLoaderIngestData;
import es.redmic.vesselslib.dto.ais.AISTrackingDTO;
import es.redmic.vesselslib.dto.tracking.VesselTrackingDTO;
import es.redmic.vesselslib.dto.vessel.VesselDTO;
import es.redmic.vesselslib.utils.VesselTrackingUtil;
import es.redmic.vesselslib.utils.VesselUtil;

@Service
public class AISService {
@@ -35,14 +38,17 @@ public class AISService {

	protected static Logger logger = LogManager.getLogger();

	@Value("${broker.topic.realtime.tracking.vessels.key.prefix}")
	private String prefix;

	@Value("${aishub.service.url}")
	private String urlAIS;

	@Value("${broker.topic.realtime.tracking.vessels}")
	private String TOPIC;
	private String VESSEL_TRACKING_TOPIC;

	@Value("${broker.topic.realtime.ais}")
	private String AIS_TOPIC;

	@Value("${broker.topic.realtime.vessels}")
	private String VESSEL_TOPIC;

	@Value("${file.delimiter.csv}")
	private String delimiterCSV;
@@ -53,8 +59,20 @@ public class AISService {
	private String nameCompressFile = "ais.zip";
	private String nameFile = "data.csv";

	@Value("${qflag.default}")
	private String QFLAG_DEFAULT;

	@Value("${vflag.default}")
	private String VFLAG_DEFAULT;

	@Value("${vesseltracking-activity-id}")
	protected String activityId;

	@Autowired
	private KafkaTemplate<String, AISTrackingDTO> aisTemplate;

	@Autowired
	private KafkaTemplate<String, AISTrackingDTO> kafkaTemplate;
	private KafkaTemplate<String, CommonDTO> vesselTemplate;

	public void fetchData() {

@@ -129,12 +147,27 @@ public class AISService {
		publishToKafka(dto);
	}

	private void publishToKafka(AISTrackingDTO dto) {
	private void publishToKafka(AISTrackingDTO aisTracking) {

		// @formatter:off
		String vesselId = VesselUtil.generateId(aisTracking.getMmsi()),
				vesselTrackingId = VesselTrackingUtil.generateId(aisTracking.getMmsi(), aisTracking.getTstamp().getMillis());
		// @formatter:on

		// Envía dto de datos brutos para sink de postgresql
		aisTemplate.send(AIS_TOPIC, vesselId, aisTracking).addCallback(new SendListener());

		VesselTrackingDTO tracking = VesselTrackingUtil.convertTrackToVesselTracking(aisTracking, QFLAG_DEFAULT,
				VFLAG_DEFAULT, activityId);

		// Envía dto de tracking para procesarlo + sink

		vesselTemplate.send(VESSEL_TRACKING_TOPIC, vesselTrackingId, tracking).addCallback(new SendListener());

		ListenableFuture<SendResult<String, AISTrackingDTO>> future = kafkaTemplate.send(TOPIC,
				prefix + dto.getMmsi().toString(), dto);
		VesselDTO vessel = tracking.getProperties().getVessel();

		future.addCallback(new SendListener());
		// Envía dto de vessel para procesarlo
		vesselTemplate.send(VESSEL_TOPIC, vesselId, vessel).addCallback(new SendListener());
	}

	private void removeZipFile() {
+12 −2
Original line number Diff line number Diff line
@@ -19,6 +19,11 @@ logging.level.es.redmic.brokerlib=error
spring.mvc.locale-resolver=fixed
spring.mvc.locale=es_ES

vesseltracking-activity-id=999

qflag.default=0
vflag.default=N

aishub.service.username=aishub.service.username
aishub.service.url=${aishub.service.url}

@@ -41,10 +46,15 @@ broker.topic.alert=alert
alert.email=info@redmic.es

#Broker
#topic del broker para enviar nuevos tracking de barcos
#topic del broker para enviar tracking de barcos
broker.topic.realtime.tracking.vessels=realtime.tracking.vessels

broker.topic.realtime.tracking.vessels.key.prefix=vessel-mmsi-
#topic del broker para enviar datos brutos obtenidos de ais
broker.topic.realtime.ais=realtime.ais

#topic del broker para enviar barcos
broker.topic.realtime.vessels=realtime.vessels


config.fixedDelay=${config.fixedDelay}
app.scheduling.enable=${app.scheduling.enable}
+45 −7
Original line number Diff line number Diff line
@@ -29,9 +29,11 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import es.redmic.ais.AISApplication;
import es.redmic.ais.exceptions.InvalidUsernameException;
import es.redmic.ais.service.AISService;
import es.redmic.brokerlib.avro.geodata.tracking.vessels.AISTrackingDTO;
import es.redmic.exception.custom.ResourceNotFoundException;
import es.redmic.testutils.kafka.KafkaBaseIntegrationTest;
import es.redmic.vesselslib.dto.ais.AISTrackingDTO;
import es.redmic.vesselslib.dto.tracking.VesselTrackingDTO;
import es.redmic.vesselslib.dto.vessel.VesselDTO;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = { AISApplication.class })
@@ -49,7 +51,13 @@ public class AISServiceTest extends KafkaBaseIntegrationTest {
	@ClassRule
	public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1);

	protected BlockingQueue<Object> blockingQueue;
	// @formatter:off

	protected BlockingQueue<Object> blockingQueueVesselTracking,
		blockingQueueAIS,
		blockingQueueVessel;

	// @formatter:on

	@PostConstruct
	public void AISServiceTestPostConstruct() throws Exception {
@@ -60,7 +68,9 @@ public class AISServiceTest extends KafkaBaseIntegrationTest {
	@Before
	public void setup() {

		blockingQueue = new LinkedBlockingDeque<>();
		blockingQueueVesselTracking = new LinkedBlockingDeque<>();
		blockingQueueAIS = new LinkedBlockingDeque<>();
		blockingQueueVessel = new LinkedBlockingDeque<>();
	}

	@Test(expected = ResourceNotFoundException.class)
@@ -84,8 +94,14 @@ public class AISServiceTest extends KafkaBaseIntegrationTest {

		Thread.sleep(500);

		// Espera que se publiquen 23016 registros a kafka
		assertEquals(23016, blockingQueue.size());
		// Espera que se publiquen 23016 registros al topic de vesselTracking
		assertEquals(23016, blockingQueueVesselTracking.size());

		// Espera que se publiquen 23016 registros al topic de ais
		assertEquals(23016, blockingQueueAIS.size());

		// Espera que se publiquen 23016 registros al topic de vessel
		assertEquals(23016, blockingQueueVessel.size());
	}

	@Test(expected = InvalidUsernameException.class)
@@ -117,7 +133,19 @@ public class AISServiceTest extends KafkaBaseIntegrationTest {
	}

	@KafkaListener(topics = "${broker.topic.realtime.tracking.vessels}")
	public void updateVessels(AISTrackingDTO dto) {
	public void vesselTracking(VesselTrackingDTO dto) {

		assertNotNull(dto);
		assertNotNull(dto.getGeometry());
		assertNotNull(dto.getProperties().getDate());
		assertNotNull(dto.getProperties().getVessel().getMmsi());
		assertNotNull(dto.getProperties().getVessel().getType().getCode());

		blockingQueueVesselTracking.offer(dto);
	}

	@KafkaListener(topics = "${broker.topic.realtime.ais}")
	public void ais(AISTrackingDTO dto) {

		assertNotNull(dto);
		assertNotNull(dto.getMmsi());
@@ -126,6 +154,16 @@ public class AISServiceTest extends KafkaBaseIntegrationTest {
		assertNotNull(dto.getTstamp());
		assertNotNull(dto.getType());

		blockingQueue.offer(dto);
		blockingQueueAIS.offer(dto);
	}

	@KafkaListener(topics = "${broker.topic.realtime.vessels}")
	public void vessels(VesselDTO dto) {

		assertNotNull(dto);
		assertNotNull(dto.getMmsi());
		assertNotNull(dto.getType());

		blockingQueueVessel.offer(dto);
	}
}
 No newline at end of file