Commit 25b7dc45 authored by Noel Alonso's avatar Noel Alonso
Browse files

Envía eventos al topic + cambia estructura

parent 83f1f8ef
Loading
Loading
Loading
Loading
+8 −4
Original line number Diff line number Diff line
@@ -7,8 +7,6 @@ import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

import com.google.common.collect.ImmutableMap;

import es.redmic.brokerlib.alert.AlertService;
import es.redmic.brokerlib.avro.common.Event;
import es.redmic.brokerlib.avro.common.EventError;
@@ -105,9 +103,11 @@ public class VesselTypeEventStreams extends EventSourcingStreams {
				.filter((id, event) -> (EventTypes.CHECK_DELETE.equals(event.getType())));

		deleteEvents.leftJoin(aggByVesselType,
				(deleteEvent, vesselAggByVesselType) -> getDeleteResultEvent(deleteEvent, vesselAggByVesselType));
				(deleteEvent, vesselAggByVesselType) -> getDeleteResultEvent(deleteEvent, vesselAggByVesselType))
				.to(topic);
	}

	@SuppressWarnings("serial")
	private Event getDeleteResultEvent(Event deleteEvent,
			HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent> vesselAggByVesselType) {

@@ -117,7 +117,11 @@ public class VesselTypeEventStreams extends EventSourcingStreams {
		} else { // elemento referenciado

			return VesselTypeEventFactory.getEvent(deleteEvent, VesselTypeEventTypes.DELETE_CHECK_FAILED,
					ExceptionType.ITEM_REFERENCED.toString(), ImmutableMap.of("id", deleteEvent.getAggregateId()));
					ExceptionType.ITEM_REFERENCED.toString(), new HashMap<String, String>() {
						{
							put("id", deleteEvent.getAggregateId());
						}
					});
		}
	}