Commit f154d527 authored by Noel Alonso's avatar Noel Alonso
Browse files

Divide los streaming y añade assert

Para controlar condiciones básicas para el streaming se añaden assert que en
caso de no cumplirse, producirá un error.
parent 23cf1458
Loading
Loading
Loading
Loading
+92 −78
Original line number Diff line number Diff line
@@ -62,6 +62,10 @@ public class VesselEventStreams extends EventStreams {

	private Event getCreatedEvent(Event confirmedEvent, Event requestEvent) {

		assert requestEvent.getType().equals(VesselEventType.CREATE_VESSEL.name());

		assert confirmedEvent.getType().equals(VesselEventType.CREATE_VESSEL_CONFIRMED.name());

		if (!isSameSession(confirmedEvent, requestEvent)) {
			String message = "Recibido evento de petición con id de sessión diferente al evento de confirmación para item "
					+ confirmedEvent.getAggregateId();
@@ -70,26 +74,13 @@ public class VesselEventStreams extends EventStreams {
			return null;
		}

		if (!(requestEvent.getType().equals(VesselEventType.CREATE_VESSEL.name()))) {
			logger.error("Se esperaba un evento de petición de tipo CREATE para Vessel.");
			return null;
		}

		logger.debug("Creando evento de creado exitoso para Vessel");

		VesselDTO vessel = ((VesselEvent) requestEvent).getVessel();

		if (confirmedEvent.getType().equals(VesselEventType.CREATE_VESSEL_CONFIRMED.name())) {

			logger.info("Enviando evento VesselCreatedEvent para: " + confirmedEvent.getAggregateId());
		logger.info("Creando evento VesselCreatedEvent para: " + confirmedEvent.getAggregateId());

		VesselCreatedEvent successfulEvent = new VesselCreatedEvent().buildFrom(confirmedEvent);
		successfulEvent.setVessel(vessel);
		return successfulEvent;
		} else {
			logger.error("Se esperaba un evento de confirmación de tipo CREATE para Vessel.");
			return null;
		}
	}

	@Override
@@ -111,6 +102,10 @@ public class VesselEventStreams extends EventStreams {

	private Event getUpdatedEvent(Event confirmedEvent, Event requestEvent) {

		assert requestEvent.getType().equals(VesselEventType.UPDATE_VESSEL.name());

		assert confirmedEvent.getType().equals(VesselEventType.UPDATE_VESSEL_CONFIRMED.name());

		if (!isSameSession(confirmedEvent, requestEvent)) {
			String message = "Recibido evento de petición con id de sessión diferente al evento de confirmación para item "
					+ confirmedEvent.getAggregateId();
@@ -119,65 +114,72 @@ public class VesselEventStreams extends EventStreams {
			return null;
		}

		if (!(requestEvent.getType().equals(VesselEventType.UPDATE_VESSEL.name()))) {
			logger.error("Se esperaba un evento de petición de UPDATE para Vessel.");
			return null;
		}

		logger.debug("Creando evento de modificado exitoso para Vessel");

		VesselDTO vessel = ((VesselEvent) requestEvent).getVessel();

		if (confirmedEvent.getType().equals(VesselEventType.UPDATE_VESSEL_CONFIRMED.name())) {

			logger.info("Enviar evento VesselUpdatedEvent para: " + confirmedEvent.getAggregateId());
		logger.info("Creando evento VesselUpdatedEvent para: " + confirmedEvent.getAggregateId());

		VesselUpdatedEvent successfulEvent = new VesselUpdatedEvent().buildFrom(confirmedEvent);
		successfulEvent.setVessel(vessel);
		return successfulEvent;
		} else {
			logger.error("Se esperaba un evento de confirmación de tipo UPDATE para Vessel.");
			return null;
		}
	}

	@Override
	protected void processFailedChangeStream(KStream<String, Event> vesselEvents) {

		// Stream filtrado por eventos de fallo al modificar y borrar
		KStream<String, Event> failedEvents = vesselEvents
				.filter((id, event) -> (VesselEventType.UPDATE_VESSEL_FAILED.toString().equals(event.getType())
						|| VesselEventType.DELETE_VESSEL_FAILED.toString().equals(event.getType())));

		// Stream filtrado por eventos de creaciones y modificaciones correctos (solo el
		// último que se produzca por id)
		KStream<String, Event> successEvents = vesselEvents
				.filter((id, event) -> (VesselEventType.VESSEL_CREATED.toString().equals(event.getType())
						|| VesselEventType.VESSEL_UPDATED.toString().equals(event.getType())));

		processUpdateFailedStream(vesselEvents, successEvents);

		processDeleteFailedStream(vesselEvents, successEvents);
	}

	protected void processUpdateFailedStream(KStream<String, Event> vesselEvents,
			KStream<String, Event> successEvents) {

		// Stream filtrado por eventos de fallo al modificar
		KStream<String, Event> failedEvents = vesselEvents
				.filter((id, event) -> (VesselEventType.UPDATE_VESSEL_FAILED.toString().equals(event.getType())));

		KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de compensación
		failedEvents.join(successEventsTable,
				(failedEvent, lastSuccessEvent) -> getCancelledEvent(failedEvent, lastSuccessEvent)).to(topic);
		failedEvents
				.join(successEventsTable,
						(failedEvent, lastSuccessEvent) -> getUpdateCancelledEvent(failedEvent, lastSuccessEvent))
				.to(topic);
	}

	private Event getCancelledEvent(Event failedEvent, Event lastSuccessEvent) {
	protected void processDeleteFailedStream(KStream<String, Event> vesselEvents,
			KStream<String, Event> successEvents) {

		if (!(lastSuccessEvent.getType().equals(VesselEventType.VESSEL_CREATED.name())
				|| lastSuccessEvent.getType().equals(VesselEventType.VESSEL_UPDATED.name()))) {
			logger.error("Se esperaba un evento satisfactorio de tipo CREATED o UPDATED para Vessel.");
			return null;
		// Stream filtrado por eventos de fallo al borrar
		KStream<String, Event> failedEvents = vesselEvents
				.filter((id, event) -> (VesselEventType.DELETE_VESSEL_FAILED.toString().equals(event.getType())));

		KTable<String, Event> successEventsTable = successEvents.groupByKey().reduce((aggValue, newValue) -> newValue);

		// Join por id, mandando a kafka el evento de compensación
		failedEvents
				.join(successEventsTable,
						(failedEvent, lastSuccessEvent) -> getDeleteCancelledEvent(failedEvent, lastSuccessEvent))
				.to(topic);
	}

		logger.debug("Creando evento de cancelación para Vessel");
	private Event getUpdateCancelledEvent(Event failedEvent, Event lastSuccessEvent) {

		assert lastSuccessEvent.getType().equals(VesselEventType.VESSEL_CREATED.name())
				|| lastSuccessEvent.getType().equals(VesselEventType.VESSEL_UPDATED.name());

		assert failedEvent.getType().equals(VesselEventType.UPDATE_VESSEL_FAILED.name());

		VesselDTO vessel = ((VesselEvent) lastSuccessEvent).getVessel();

		EventError eventError = (EventError) failedEvent;

		if (failedEvent.getType().equals(VesselEventType.UPDATE_VESSEL_FAILED.name())) {

		logger.info("Enviando evento UpdateVesselCancelledEvent para: " + failedEvent.getAggregateId());

		UpdateVesselCancelledEvent cancelledEvent = new UpdateVesselCancelledEvent().buildFrom(failedEvent);
@@ -185,8 +187,18 @@ public class VesselEventStreams extends EventStreams {
		cancelledEvent.setExceptionType(eventError.getExceptionType());
		cancelledEvent.setArguments(eventError.getArguments());
		return cancelledEvent;
	}

	private Event getDeleteCancelledEvent(Event failedEvent, Event lastSuccessEvent) {

		assert lastSuccessEvent.getType().equals(VesselEventType.VESSEL_CREATED.name())
				|| lastSuccessEvent.getType().equals(VesselEventType.VESSEL_UPDATED.name());

		assert failedEvent.getType().equals(VesselEventType.DELETE_VESSEL_FAILED.name());

		VesselDTO vessel = ((VesselEvent) lastSuccessEvent).getVessel();

		} else if (failedEvent.getType().equals(VesselEventType.DELETE_VESSEL_FAILED.name())) {
		EventError eventError = (EventError) failedEvent;

		logger.info("Enviando evento DeleteVesselCancelledEvent para: " + failedEvent.getAggregateId());

@@ -195,21 +207,25 @@ public class VesselEventStreams extends EventStreams {
		cancelledEvent.setExceptionType(eventError.getExceptionType());
		cancelledEvent.setArguments(eventError.getArguments());
		return cancelledEvent;
		} else {
			logger.error("Se esperaba un evento fallido de tipo UPDATE o DELETE para Vessel.");
			return null;
		}
	}

	@Override
	protected void processPostUpdateStream(KStream<String, Event> vesselEvents) {

		KStream<String, Event> vesselEventsStreamByTypeId = vesselEvents.selectKey((k, v) -> v.getAggregateId());
		// Stream filtrado por eventos que contengan Vessel dentro que son los de
		// comienzo y final
		// del ciclo
		KStream<String, Event> vesselEventsStream = vesselEvents.filter((id, event) -> {
			return (event instanceof VesselEvent);
		});

		KStream<String, Event> vesselEventsStreamByTypeId = vesselEventsStream
				.selectKey((k, v) -> ((VesselEvent) v).getVessel().getType().getId());

		KTable<String, HashMap<String, Event>> vesselEventsTable = vesselEventsStreamByTypeId.groupByKey()
		KTable<String, HashMap<String, VesselEvent>> vesselEventsTable = vesselEventsStreamByTypeId.groupByKey()
				.aggregate(HashMap::new, (key, value, hashMap) -> {
					// Añade a hashmap cada uno de los values
					hashMap.put(value.getAggregateId(), value);
					hashMap.put(value.getAggregateId(), (VesselEvent) value);
					return hashMap;
				}, Materialized.with(Serdes.String(), new HashMapSerde<>(schemaRegistry)));

@@ -228,30 +244,28 @@ public class VesselEventStreams extends EventStreams {
	}

	private ArrayList<VesselEvent> getPostUpdateEvent(Event updateReferenceEvent,
			HashMap<String, Event> vesselWithReferenceEvents) {
			HashMap<String, VesselEvent> vesselWithReferenceEvents) {

		ArrayList<VesselEvent> result = new ArrayList<>();

		for (Map.Entry<String, Event> entry : vesselWithReferenceEvents.entrySet()) {
		for (Map.Entry<String, VesselEvent> entry : vesselWithReferenceEvents.entrySet()) {

			Event event = entry.getValue();
			VesselEvent vesselEvent = entry.getValue();
			VesselTypeDTO vesselType = ((VesselTypeEvent) updateReferenceEvent).getVesselType();

			if (itemIsLocked(event.getType())) {
			if (itemIsLocked(vesselEvent.getType())) {

				if (!event.getType().equals(VesselEventType.VESSEL_DELETED.toString())) {
					String message = "Item con id " + event.getAggregateId()
				if (!vesselEvent.getType().equals(VesselEventType.VESSEL_DELETED.toString())) {
					String message = "Item con id " + vesselEvent.getAggregateId()
							+ " se encuentra en mitad de un ciclo de creación o edición, por lo que no se modificó la referencia "
							+ updateReferenceEvent.getAggregateId();

					logger.info(message);
					alertService.errorAlert(event.getAggregateId(), message);
					alertService.errorAlert(vesselEvent.getAggregateId(), message);
				}

			} else {

				VesselEvent vesselEvent = (VesselEvent) event;

				logger.debug("Creando evento de update para Vessel " + vesselEvent.getAggregateId()
						+ " por cambio en vesselType");