Commit e6777713 authored by Noel Alonso's avatar Noel Alonso
Browse files

Agrega en una clase minimizada para reducir datos

De este modo el topic intermedio que se crea, no aumenta tanto de tamaño
parent 41200b69
Loading
Loading
Loading
Loading
+20 −18
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@ import es.redmic.vesselslib.events.vessel.VesselEventTypes;
import es.redmic.vesselslib.events.vessel.common.VesselEvent;
import es.redmic.vesselslib.events.vessel.create.VesselCreatedEvent;
import es.redmic.vesselslib.events.vessel.delete.DeleteVesselCancelledEvent;
import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.AggregationVesselTypeInVesselPostUpdateEvent;
import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.UpdateVesselTypeInVesselEvent;
import es.redmic.vesselslib.events.vessel.update.UpdateVesselCancelledEvent;
import es.redmic.vesselslib.events.vessel.update.VesselUpdatedEvent;
@@ -30,7 +31,6 @@ import es.redmic.vesselslib.events.vesseltype.common.VesselTypeEvent;

public class VesselEventStreams extends EventStreams {

	private static final String REDMIC_PROCESS = "REDMIC_PROCESS";
	private String vesselTypeTopic;

	public VesselEventStreams(StreamConfig config, String vesselTypeTopic, AlertService alertService) {
@@ -229,7 +229,6 @@ public class VesselEventStreams extends EventStreams {

		logger.info("Enviando evento UpdateVesselCancelledEvent para: " + failedEvent.getAggregateId());

		if (failedEvent.getUserId().equals(REDMIC_PROCESS))
		alertService.errorAlert("UpdateVesselCancelledEvent para: " + failedEvent.getAggregateId(),
				eventError.getExceptionType() + " " + eventError.getArguments());

@@ -273,10 +272,12 @@ public class VesselEventStreams extends EventStreams {
		KStream<String, Event> vesselEventsStreamByTypeId = vesselEventsStream
				.selectKey((k, v) -> ((VesselEvent) v).getVessel().getType().getId());

		KTable<String, HashMap<String, VesselEvent>> vesselEventsTable = vesselEventsStreamByTypeId.groupByKey()
				.aggregate(HashMap::new, (key, value, hashMap) -> {
		KTable<String, HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent>> vesselEventsTable = vesselEventsStreamByTypeId
				.groupByKey().aggregate(HashMap::new, (key, value, hashMap) -> {
					// Añade a hashmap cada uno de los values
					hashMap.put(value.getAggregateId(), (VesselEvent) value);
					hashMap.put(value.getAggregateId(),
							new AggregationVesselTypeInVesselPostUpdateEvent(value.getType(),
									((VesselEvent) value).getVessel().getType()).buildFrom(value));
					return hashMap;
				}, Materialized.with(Serdes.String(), new HashMapSerde<>(schemaRegistry)));

@@ -295,37 +296,38 @@ public class VesselEventStreams extends EventStreams {
	}

	private ArrayList<UpdateVesselTypeInVesselEvent> getPostUpdateEvent(Event updateReferenceEvent,
			HashMap<String, VesselEvent> vesselWithReferenceEvents) {
			HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent> vesselWithReferenceEvents) {

		ArrayList<UpdateVesselTypeInVesselEvent> result = new ArrayList<>();

		for (Map.Entry<String, VesselEvent> entry : vesselWithReferenceEvents.entrySet()) {
		for (Map.Entry<String, AggregationVesselTypeInVesselPostUpdateEvent> entry : vesselWithReferenceEvents
				.entrySet()) {

			VesselEvent vesselEvent = entry.getValue();
			AggregationVesselTypeInVesselPostUpdateEvent aggregationEvent = entry.getValue();
			VesselTypeDTO vesselType = ((VesselTypeEvent) updateReferenceEvent).getVesselType();

			if (VesselEventTypes.isLocked(vesselEvent.getType())) {
			if (VesselEventTypes.isLocked(aggregationEvent.getType())) {

				if (!vesselEvent.getType().equals(VesselEventTypes.DELETED)) {
					String message = "Item con id " + vesselEvent.getAggregateId()
				if (!aggregationEvent.getType().equals(VesselEventTypes.DELETED)) {
					String message = "Item con id " + aggregationEvent.getAggregateId()
							+ " se encuentra en mitad de un ciclo de creación o edición, por lo que no se modificó la referencia "
							+ updateReferenceEvent.getAggregateId();

					logger.info(message);
					alertService.errorAlert(vesselEvent.getAggregateId(), message);
					alertService.errorAlert(aggregationEvent.getAggregateId(), message);
				}

			} else {

				logger.debug("Creando evento de update para Vessel " + vesselEvent.getAggregateId()
				logger.debug("Creando evento de update para Vessel " + aggregationEvent.getAggregateId()
						+ " por cambio en vesselType");

				if (!vesselEvent.getVessel().getType().equals(vesselType)) {
				if (!aggregationEvent.getVesselType().equals(vesselType)) {

					UpdateVesselTypeInVesselEvent updateVesselType = new UpdateVesselTypeInVesselEvent();
					updateVesselType.setAggregateId(vesselEvent.getAggregateId());
					updateVesselType.setAggregateId(aggregationEvent.getAggregateId());
					updateVesselType.setUserId(updateReferenceEvent.getUserId());
					updateVesselType.setVersion(vesselEvent.getVersion() + 1);
					updateVesselType.setVersion(aggregationEvent.getVersion() + 1);
					updateVesselType.setVesselType(((VesselTypeEvent) updateReferenceEvent).getVesselType());
					result.add(updateVesselType);