Commit 3de3c097 authored by Noel Alonso's avatar Noel Alonso
Browse files

Cambia streaming de postupdate

En lugar de agregar en una lista lo hace en un hashmap para que se sobreescriban
los eventos con la misma clave de vessel, manteniendo solamente el último en llegar
parent e66c544b
Loading
Loading
Loading
Loading
+18 −16
Original line number Diff line number Diff line
@@ -204,13 +204,12 @@ public class VesselEventStreams extends EventStreams {
	@Override
	protected void processPostUpdateStream(KStream<String, Event> vesselEvents) {

		KStream<String, Event> vesselEventsStreamByTypeId = vesselEvents
				.selectKey((k, v) -> ((VesselEvent) v).getVessel().getType().getId());
		KStream<String, Event> vesselEventsStreamByTypeId = vesselEvents.selectKey((k, v) -> v.getAggregateId());

		KTable<String, HashMap<String, VesselEvent>> vesselEventsTable = vesselEventsStreamByTypeId.groupByKey()
		KTable<String, HashMap<String, Event>> vesselEventsTable = vesselEventsStreamByTypeId.groupByKey()
				.aggregate(HashMap::new, (key, value, hashMap) -> {
					// Añade a hashmap cada uno de los values
					hashMap.put(value.getAggregateId(), (VesselEvent) value);
					hashMap.put(value.getAggregateId(), value);
					return hashMap;
				}, Materialized.with(Serdes.String(), new HashMapSerde<>(schemaRegistry)));

@@ -229,26 +228,30 @@ public class VesselEventStreams extends EventStreams {
	}

	private ArrayList<VesselEvent> getPostUpdateEvent(Event updateReferenceEvent,
			HashMap<String, VesselEvent> vesselWithReferenceEvents) {
			HashMap<String, Event> vesselWithReferenceEvents) {

		ArrayList<VesselEvent> result = new ArrayList<>();

		for (Map.Entry<String, VesselEvent> entry : vesselWithReferenceEvents.entrySet()) {
		for (Map.Entry<String, Event> entry : vesselWithReferenceEvents.entrySet()) {

			VesselEvent vesselEvent = entry.getValue();
			Event event = entry.getValue();
			VesselTypeDTO vesselType = ((VesselTypeEvent) updateReferenceEvent).getVesselType();

			if (!isEventCompleted(vesselEvent.getType())) {
			if (itemIsLocked(event.getType())) {

				String message = "Item con id " + vesselEvent.getAggregateId()
				if (!event.getType().equals(VesselEventType.VESSEL_DELETED.toString())) {
					String message = "Item con id " + event.getAggregateId()
							+ " se encuentra en mitad de un ciclo de creación o edición, por lo que no se modificó la referencia "
							+ updateReferenceEvent.getAggregateId();

					logger.info(message);
				alertService.errorAlert(vesselEvent.getAggregateId(), message);
					alertService.errorAlert(event.getAggregateId(), message);
				}

			} else {

				VesselEvent vesselEvent = (VesselEvent) event;

				logger.debug("Creando evento de update para Vessel " + vesselEvent.getAggregateId()
						+ " por cambio en vesselType");

@@ -271,11 +274,10 @@ public class VesselEventStreams extends EventStreams {
		return result;
	}

	private boolean isEventCompleted(String type) {
	private boolean itemIsLocked(String type) {

		return (VesselEventType.VESSEL_CREATED.toString().equals(type)
		return !(VesselEventType.VESSEL_CREATED.toString().equals(type)
				|| VesselEventType.VESSEL_UPDATED.toString().equals(type)
				|| VesselEventType.VESSEL_DELETED.toString().equals(type)
				|| VesselEventType.CREATE_VESSEL_CANCELLED.toString().equals(type)
				|| VesselEventType.UPDATE_VESSEL_CANCELLED.toString().equals(type)
				|| VesselEventType.DELETE_VESSEL_CANCELLED.toString().equals(type));