Commit 625f94a1 authored by Noel Alonso's avatar Noel Alonso
Browse files

Añade compatibilidad con speed layer

De esta manera si un dato ya fue insertado por la speed layer, se
permite su modificación (upsert) por la batch layer
parent 83ea5dbe
Loading
Loading
Loading
Loading
+66 −25
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@ import es.redmic.elasticsearchlib.geodata.repository.RWGeoDataESRepository;
import es.redmic.exception.common.ExceptionType;
import es.redmic.models.es.common.dto.EventApplicationResult;
import es.redmic.models.es.common.query.dto.DataQueryDTO;
import es.redmic.vesselslib.utils.VesselTrackingUtil;
import es.redmic.vesselsview.model.vessel.Vessel;
import es.redmic.vesselsview.model.vesseltracking.VesselTracking;
import es.redmic.viewlib.geodata.repository.IGeoDataRepository;
@@ -61,28 +62,50 @@ public class VesselTrackingESRepository extends RWGeoDataESRepository<VesselTrac
		return update(vesselTrackingId, doc);
	}

	/**
	 * Función que comprueba que un elemento puede ser añadido a elasticsearch
	 * cumpliendo todas las restricciones
	 * 
	 * Que no exista un elemento con el mismo id. | Que no exista un elemento con el
	 * mismo mmsi para la misma fecha. | En caso de ser un elemento procesado (No
	 * speed layer), no debe existir un elmento con el mismo uuid
	 */

	@Override
	protected EventApplicationResult checkInsertConstraintsFulfilled(VesselTracking modelToIndex) {

		boolean notProcessed = modelToIndex.getUuid().equals(VesselTrackingUtil.UUID_DEFAULT);

		// @formatter:off

		QueryBuilder idTerm = QueryBuilders.termQuery(ID_PROPERTY, modelToIndex.getId()),
				uuidTerm = QueryBuilders.termQuery(UUID_PROPERTY, modelToIndex.getUuid()),
				mmsiTerm = QueryBuilders.boolQuery()
						.must(QueryBuilders.termQuery(MMSI_PROPERTY, modelToIndex.getProperties().getVessel().getMmsi()))
						.must(QueryBuilders.termQuery(DATE_PROPERTY, modelToIndex.getProperties().getDate()));
						.must(QueryBuilders.termQuery(DATE_PROPERTY, modelToIndex.getProperties().getDate())),
				uuidTerm = QueryBuilders.boolQuery()
					.must(QueryBuilders.termQuery(ID_PROPERTY, modelToIndex.getId()))
					.must(QueryBuilders.termQuery(UUID_PROPERTY, modelToIndex.getUuid()));
		
		if (!notProcessed) {
			QueryBuilder aux = uuidTerm;
			uuidTerm = QueryBuilders.boolQuery()
					.should(QueryBuilders.boolQuery()
							.must(QueryBuilders.termQuery(ID_PROPERTY, modelToIndex.getId()))
							.mustNot(QueryBuilders.termQuery(UUID_PROPERTY, VesselTrackingUtil.UUID_DEFAULT)))
					.should(aux);
		}
		
		SearchRequestBuilder requestBuilderId = ESProvider.getClient().prepareSearch(getIndex()).setTypes(getType())
				.setQuery(idTerm).setSize(1),
			requestBuilderUuid = ESProvider.getClient().prepareSearch(getIndex()).setTypes(getType())
				.setQuery(uuidTerm).setSize(1),
			requestBuilderMmsi = ESProvider.getClient().prepareSearch(getIndex()).setTypes(getType())
				.setQuery(mmsiTerm).setSize(1);
				.setQuery(mmsiTerm).setSize(1),
			requestBuilderUuid = ESProvider.getClient().prepareSearch(getIndex()).setTypes(getType())
				.setQuery(uuidTerm).setSize(1);

		MultiSearchRequestBuilder multiSearchRequestBuilder = ESProvider.getClient().prepareMultiSearch()
			.add(requestBuilderId)
			.add(requestBuilderUuid)
			.add(requestBuilderMmsi);
			.add(requestBuilderMmsi)
			.add(requestBuilderUuid);
		
		MultiSearchResponse sr = multiSearchRequestBuilder.get();

@@ -97,12 +120,15 @@ public class VesselTrackingESRepository extends RWGeoDataESRepository<VesselTrac
		}

		if (responses != null && responses[1].getResponse().getHits().getTotalHits() > 0) {
			arguments.put(UUID_PROPERTY, modelToIndex.getUuid().toString());
			arguments.put(MMSI_PROPERTY, modelToIndex.getProperties().getVessel().getMmsi().toString());
			arguments.put(DATE_PROPERTY, modelToIndex.getProperties().getDate().toString());
		}

		if (responses != null && responses[2].getResponse().getHits().getTotalHits() > 0) {
			arguments.put(MMSI_PROPERTY, modelToIndex.getProperties().getVessel().getMmsi().toString());
			arguments.put(DATE_PROPERTY, modelToIndex.getProperties().getDate().toString());
			arguments.put(UUID_PROPERTY, modelToIndex.getUuid().toString());
		} else if (!notProcessed) { // Si no es un item no procesado y el uuid del item almacenado no es diferente a
									// not_process entonces no se tienen en cuenta los conflictos
			return new EventApplicationResult(true);
		}

		if (arguments.size() > 0) {
@@ -112,29 +138,44 @@ public class VesselTrackingESRepository extends RWGeoDataESRepository<VesselTrac
		return new EventApplicationResult(true);
	}

	/**
	 * Función que comprueba que un elemento puede ser editado cumpliendo todas las
	 * restricciones
	 * 
	 * Que no exista un elemento con diferente id, pero el mismo mmsi y la misma
	 * fecha (Que ya exista). | Que no exista ese uuid en otro elemento
	 */

	// TODO: Controlar ediciones de items no procesados
	@Override
	protected EventApplicationResult checkUpdateConstraintsFulfilled(VesselTracking modelToIndex) {

		// @formatter:off

		BoolQueryBuilder idTerm = QueryBuilders.boolQuery()
				.must(QueryBuilders.termQuery(ID_PROPERTY, modelToIndex.getId()))
				.mustNot(QueryBuilders.termQuery(UUID_PROPERTY, modelToIndex.getUuid())),
				
				mmsiTerm = QueryBuilders.boolQuery()
		BoolQueryBuilder mmsiTerm = QueryBuilders.boolQuery()
					.must(QueryBuilders.termQuery(MMSI_PROPERTY, modelToIndex.getProperties().getVessel().getMmsi()))
					.must(QueryBuilders.termQuery(DATE_PROPERTY, modelToIndex.getProperties().getDate()))
					.mustNot(QueryBuilders.termQuery(ID_PROPERTY, modelToIndex.getId()));
					.mustNot(QueryBuilders.termQuery(ID_PROPERTY, modelToIndex.getId())),
				idTerm = QueryBuilders.boolQuery()
					.must(QueryBuilders.termQuery(ID_PROPERTY, modelToIndex.getId()))
					.mustNot(QueryBuilders.termQuery(UUID_PROPERTY, modelToIndex.getUuid()));
		
		SearchRequestBuilder requestBuilderId = ESProvider.getClient().prepareSearch(getIndex()).setTypes(getType())
				.setQuery(idTerm).setSize(1),
			requestBuilderMmsi = ESProvider.getClient().prepareSearch(getIndex()).setTypes(getType())
				.setQuery(mmsiTerm).setSize(1);
		SearchRequestBuilder requestBuilderMmsi = ESProvider.getClient().prepareSearch(getIndex()).setTypes(getType())
				.setQuery(mmsiTerm).setSize(1),
				requestBuilderId = null;
		
		if (idTerm != null) {
			requestBuilderId = ESProvider.getClient().prepareSearch(getIndex()).setTypes(getType())
				.setQuery(idTerm).setSize(1);
		}

		MultiSearchRequestBuilder multiSearchRequestBuilder = ESProvider.getClient().prepareMultiSearch()
			.add(requestBuilderId)
			.add(requestBuilderMmsi);
		
		if (requestBuilderId != null) {
			multiSearchRequestBuilder.add(requestBuilderId);
		}
		
		MultiSearchResponse sr = multiSearchRequestBuilder.get();

		// @formatter:on
@@ -144,14 +185,14 @@ public class VesselTrackingESRepository extends RWGeoDataESRepository<VesselTrac
		Item[] responses = sr.getResponses();

		if (responses != null && responses[0].getResponse().getHits().getTotalHits() > 0) {
			arguments.put(ID_PROPERTY, modelToIndex.getId().toString());
		}

		if (responses != null && responses[1].getResponse().getHits().getTotalHits() > 0) {
			arguments.put(MMSI_PROPERTY, modelToIndex.getProperties().getVessel().getMmsi().toString());
			arguments.put(DATE_PROPERTY, modelToIndex.getProperties().getDate().toString());
		}

		if (responses != null && requestBuilderId != null && responses[1].getResponse().getHits().getTotalHits() > 0) {
			arguments.put(ID_PROPERTY, modelToIndex.getId().toString());
		}

		if (arguments.size() > 0) {
			return new EventApplicationResult(ExceptionType.ES_UPDATE_DOCUMENT.toString(), arguments);
		}
+31 −0
Original line number Diff line number Diff line
@@ -60,6 +60,7 @@ import es.redmic.vesselslib.events.vesseltracking.delete.DeleteVesselTrackingFai
import es.redmic.vesselslib.events.vesseltracking.update.UpdateVesselTrackingConfirmedEvent;
import es.redmic.vesselslib.events.vesseltracking.update.UpdateVesselTrackingEvent;
import es.redmic.vesselslib.events.vesseltracking.update.UpdateVesselTrackingFailedEvent;
import es.redmic.vesselslib.utils.VesselTrackingUtil;
import es.redmic.vesselsview.VesselsViewApplication;
import es.redmic.vesselsview.model.vesseltracking.VesselTracking;
import es.redmic.vesselsview.repository.vesseltracking.VesselTrackingESRepository;
@@ -148,6 +149,36 @@ public class VesselTrackingEventHandlerTest extends DocumentationViewBaseTest {
		assertEqualsVesselTracking(vesselTracking, event.getVesselTracking());
	}

	@Test
	public void sendVesselTrackingCreatedEvent_SaveItem_IfItemIsNotProcessed() throws Exception {

		CreateVesselTrackingEvent event = getCreateVesselTrackingEvent();

		event.getVesselTracking().setUuid(VesselTrackingUtil.UUID_DEFAULT);

		repository.save(mapper.getMapperFacade().map(event.getVesselTracking(), VesselTracking.class));

		event.getVesselTracking().setUuid(UUID.randomUUID().toString());

		ListenableFuture<SendResult<String, Event>> future = kafkaTemplate.send(VESSELTRACKING_TOPIC,
				event.getAggregateId(), event);
		future.addCallback(new SendListener());

		Event confirm = (Event) blockingQueue.poll(50, TimeUnit.SECONDS);

		GeoHitWrapper<?> item = repository.findById(event.getVesselTracking().getId());
		assertNotNull(item.get_source());

		// Se restablece el estado de la vista
		repository.delete(event.getVesselTracking().getId());

		assertNotNull(confirm);
		assertEquals(VesselTrackingEventTypes.CREATE_CONFIRMED, confirm.getType());

		VesselTracking vesselTracking = (VesselTracking) item.get_source();
		assertEqualsVesselTracking(vesselTracking, event.getVesselTracking());
	}

	@Test
	public void sendVesselTrackingUpdatedEvent_UpdateItem_IfEventIsOk() throws Exception {