Loading src/main/java/es/redmic/ais/service/AISService.java +57 −96 Original line number Diff line number Diff line Loading @@ -3,13 +3,12 @@ package es.redmic.ais.service; import java.io.File; import java.io.IOException; import java.net.URL; import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; Loading @@ -17,121 +16,117 @@ import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; import es.redmic.ais.exceptions.InvalidUsernameException; import es.redmic.brokerlib.avro.geodata.tracking.vessels.AISTrackingDTO; import es.redmic.brokerlib.listener.SendListener; import es.redmic.exception.custom.ResourceNotFoundException; import es.redmic.utils.compressor.Zip; import es.redmic.utils.csv.DataLoaderIngestData; @Service public class AISService { // @formatter:off private static final String INVALID_USERNAME_OR_PASSWORD = "Invalid username or password", TOO_FREQUENT_REQUESTS = "Too frequent requests"; // @formatter:on protected static Logger logger = LogManager.getLogger(); @Value("${aishub.service.url}") private String urlAIS; private String directoryPath = System.getProperty("user.dir") + "/target"; private String nameCompressFile = "ais.zip"; private String nameFile = "data.csv"; private String delimiterCSV = ","; @Value("${broker.topic.realtime.tracking.vessels}") private String TOPIC; @Value("${file.delimiter.csv}") private String delimiterCSV; @Value("${property.path.media_storage.AIS_TEMP}") private String directoryPath; private String nameCompressFile = "ais.zip"; private String nameFile = "data.csv"; @Autowired private KafkaTemplate<String, AISTrackingDTO> kafkaTemplate; public void update() { public void fetchData() { updateFile(); prepareFile(); processFile(); } public void updateFile() { private void prepareFile() { downloadData(); unzip(); } public void downloadData() { private void downloadData() { try { URL url = new URL(getUrlAIS()); URL url = new URL(urlAIS); File destination = new File(getCompressFilePath()); FileUtils.copyURLToFile(url, destination); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); throw new ResourceNotFoundException(); } } public void unzip() { private void unzip() { Zip zip = new Zip(); zip.extract(getCompressFilePath(), getDirectoryPath() + "/"); zip.extract(getCompressFilePath(), directoryPath + "/"); removeZipFile(); } public void processFile() { private void processFile() { File file = new File(getFilePath()); DataLoaderIngestData dataLoader = new DataLoaderIngestData(file, getDelimiterCSV()); DataLoaderIngestData dataLoader = new DataLoaderIngestData(file, delimiterCSV); checkFile(dataLoader.getHeader()); Map<String, String> row; while ((row = dataLoader.read()) != null) { processRow(row); } file.delete(); } public void processRow(Map<String, String> row) { private void checkFile(List<String> header) { AISTrackingDTO dto = new AISTrackingDTO(); if (header.size() == 1) { if (header.get(0).contains(INVALID_USERNAME_OR_PASSWORD)) { dto.setMmsi(parseInteger(row.get("MMSI"))); dto.setTstamp(DateTime.parse(row.get("TSTAMP"), DateTimeFormat.forPattern("yyy-MM-dd HH:mm:ss zzz"))); dto.setLatitude(parseDouble(row.get("LATITUDE"))); dto.setLongitude(parseDouble(row.get("LONGITUDE"))); dto.setCog(parseDouble(row.get("COG"))); dto.setSog(parseDouble(row.get("SOG"))); dto.setDraught(parseDouble(row.get("DRAUGHT"))); dto.setType(parseInteger(row.get("TYPE"))); dto.setImo(parseInteger(row.get("IMO"))); dto.setHeading(parseInteger(row.get("HEADING"))); dto.setNavStat(parseInteger(row.get("NAVSTAT"))); dto.setName(row.get("NAME")); dto.setDest(row.get("DEST")); dto.setCallSign(row.get("CALLSIGN")); dto.setEta(row.get("ETA")); dto.setA(parseDouble(row.get("A"))); dto.setB(parseDouble(row.get("B"))); dto.setC(parseDouble(row.get("C"))); dto.setD(parseDouble(row.get("D"))); logger.error("Error en el fichero. " + INVALID_USERNAME_OR_PASSWORD); throw new InvalidUsernameException(); } else if (header.get(0).contains(TOO_FREQUENT_REQUESTS)) { emitDTO(dto); logger.error("Error en el fichero. " + TOO_FREQUENT_REQUESTS); throw new InvalidUsernameException(); } public Double parseDouble(String value) { if (value != null) { return Double.parseDouble(value); } return null; } public Integer parseInteger(String value) { private void processRow(Map<String, String> row) { if (value != null) { return Integer.parseInt(value); } AISTrackingDTO dto = new AISTrackingDTO(); dto.buildFromMap(row); return null; publishToKafka(dto); } public void emitDTO(AISTrackingDTO dto) { private void publishToKafka(AISTrackingDTO dto) { logger.info("Tracking vessel: " + dto.getMmsi()); Loading @@ -141,53 +136,19 @@ public class AISService { future.addCallback(new SendListener()); } public String getFilePath() { return getDirectoryPath() + "/" + getNameFile(); } public String getCompressFilePath() { private void removeZipFile() { return getDirectoryPath() + "/" + getNameCompressFile(); File file = new File(getCompressFilePath()); file.delete(); } public String getUrlAIS() { return urlAIS; } public String getDirectoryPath() { return directoryPath; } public String getNameCompressFile() { return nameCompressFile; } private String getFilePath() { public String getNameFile() { return nameFile; return directoryPath + "/" + nameFile; } public void setNameFile(String nameFile) { this.nameFile = nameFile; } public void setNameCompressFile(String nameCompressFile) { this.nameCompressFile = nameCompressFile; } public void setDirectoryPath(String directoryPath) { this.directoryPath = directoryPath; } public void setUrlAIS(String urlAIS) { this.urlAIS = urlAIS; } public String getDelimiterCSV() { return delimiterCSV; } private String getCompressFilePath() { public void setDelimiterCSV(String delimiterCSV) { this.delimiterCSV = delimiterCSV; return directoryPath + "/" + nameCompressFile; } } Loading
src/main/java/es/redmic/ais/service/AISService.java +57 −96 Original line number Diff line number Diff line Loading @@ -3,13 +3,12 @@ package es.redmic.ais.service; import java.io.File; import java.io.IOException; import java.net.URL; import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; Loading @@ -17,121 +16,117 @@ import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; import es.redmic.ais.exceptions.InvalidUsernameException; import es.redmic.brokerlib.avro.geodata.tracking.vessels.AISTrackingDTO; import es.redmic.brokerlib.listener.SendListener; import es.redmic.exception.custom.ResourceNotFoundException; import es.redmic.utils.compressor.Zip; import es.redmic.utils.csv.DataLoaderIngestData; @Service public class AISService { // @formatter:off private static final String INVALID_USERNAME_OR_PASSWORD = "Invalid username or password", TOO_FREQUENT_REQUESTS = "Too frequent requests"; // @formatter:on protected static Logger logger = LogManager.getLogger(); @Value("${aishub.service.url}") private String urlAIS; private String directoryPath = System.getProperty("user.dir") + "/target"; private String nameCompressFile = "ais.zip"; private String nameFile = "data.csv"; private String delimiterCSV = ","; @Value("${broker.topic.realtime.tracking.vessels}") private String TOPIC; @Value("${file.delimiter.csv}") private String delimiterCSV; @Value("${property.path.media_storage.AIS_TEMP}") private String directoryPath; private String nameCompressFile = "ais.zip"; private String nameFile = "data.csv"; @Autowired private KafkaTemplate<String, AISTrackingDTO> kafkaTemplate; public void update() { public void fetchData() { updateFile(); prepareFile(); processFile(); } public void updateFile() { private void prepareFile() { downloadData(); unzip(); } public void downloadData() { private void downloadData() { try { URL url = new URL(getUrlAIS()); URL url = new URL(urlAIS); File destination = new File(getCompressFilePath()); FileUtils.copyURLToFile(url, destination); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); throw new ResourceNotFoundException(); } } public void unzip() { private void unzip() { Zip zip = new Zip(); zip.extract(getCompressFilePath(), getDirectoryPath() + "/"); zip.extract(getCompressFilePath(), directoryPath + "/"); removeZipFile(); } public void processFile() { private void processFile() { File file = new File(getFilePath()); DataLoaderIngestData dataLoader = new DataLoaderIngestData(file, getDelimiterCSV()); DataLoaderIngestData dataLoader = new DataLoaderIngestData(file, delimiterCSV); checkFile(dataLoader.getHeader()); Map<String, String> row; while ((row = dataLoader.read()) != null) { processRow(row); } file.delete(); } public void processRow(Map<String, String> row) { private void checkFile(List<String> header) { AISTrackingDTO dto = new AISTrackingDTO(); if (header.size() == 1) { if (header.get(0).contains(INVALID_USERNAME_OR_PASSWORD)) { dto.setMmsi(parseInteger(row.get("MMSI"))); dto.setTstamp(DateTime.parse(row.get("TSTAMP"), DateTimeFormat.forPattern("yyy-MM-dd HH:mm:ss zzz"))); dto.setLatitude(parseDouble(row.get("LATITUDE"))); dto.setLongitude(parseDouble(row.get("LONGITUDE"))); dto.setCog(parseDouble(row.get("COG"))); dto.setSog(parseDouble(row.get("SOG"))); dto.setDraught(parseDouble(row.get("DRAUGHT"))); dto.setType(parseInteger(row.get("TYPE"))); dto.setImo(parseInteger(row.get("IMO"))); dto.setHeading(parseInteger(row.get("HEADING"))); dto.setNavStat(parseInteger(row.get("NAVSTAT"))); dto.setName(row.get("NAME")); dto.setDest(row.get("DEST")); dto.setCallSign(row.get("CALLSIGN")); dto.setEta(row.get("ETA")); dto.setA(parseDouble(row.get("A"))); dto.setB(parseDouble(row.get("B"))); dto.setC(parseDouble(row.get("C"))); dto.setD(parseDouble(row.get("D"))); logger.error("Error en el fichero. " + INVALID_USERNAME_OR_PASSWORD); throw new InvalidUsernameException(); } else if (header.get(0).contains(TOO_FREQUENT_REQUESTS)) { emitDTO(dto); logger.error("Error en el fichero. " + TOO_FREQUENT_REQUESTS); throw new InvalidUsernameException(); } public Double parseDouble(String value) { if (value != null) { return Double.parseDouble(value); } return null; } public Integer parseInteger(String value) { private void processRow(Map<String, String> row) { if (value != null) { return Integer.parseInt(value); } AISTrackingDTO dto = new AISTrackingDTO(); dto.buildFromMap(row); return null; publishToKafka(dto); } public void emitDTO(AISTrackingDTO dto) { private void publishToKafka(AISTrackingDTO dto) { logger.info("Tracking vessel: " + dto.getMmsi()); Loading @@ -141,53 +136,19 @@ public class AISService { future.addCallback(new SendListener()); } public String getFilePath() { return getDirectoryPath() + "/" + getNameFile(); } public String getCompressFilePath() { private void removeZipFile() { return getDirectoryPath() + "/" + getNameCompressFile(); File file = new File(getCompressFilePath()); file.delete(); } public String getUrlAIS() { return urlAIS; } public String getDirectoryPath() { return directoryPath; } public String getNameCompressFile() { return nameCompressFile; } private String getFilePath() { public String getNameFile() { return nameFile; return directoryPath + "/" + nameFile; } public void setNameFile(String nameFile) { this.nameFile = nameFile; } public void setNameCompressFile(String nameCompressFile) { this.nameCompressFile = nameCompressFile; } public void setDirectoryPath(String directoryPath) { this.directoryPath = directoryPath; } public void setUrlAIS(String urlAIS) { this.urlAIS = urlAIS; } public String getDelimiterCSV() { return delimiterCSV; } private String getCompressFilePath() { public void setDelimiterCSV(String delimiterCSV) { this.delimiterCSV = delimiterCSV; return directoryPath + "/" + nameCompressFile; } }