Commit 9cd10fdf authored by Noel Alonso's avatar Noel Alonso
Browse files

Cambia a stream principal a la hora de hacer join

Para comprobar si el barco existe en kafka, se hace join con el stream
principal en lugar del snapshot
parent 14187c9b
Loading
Loading
Loading
Loading
+7 −4
Original line number Diff line number Diff line
@@ -35,7 +35,7 @@ public class VesselEventStreams extends EventSourcingStreams {

	// private String vesselsAggByVesselTypeTopic;

	private String vesselTypeUpdatedTopic;
	// private String vesselTypeUpdatedTopic;

	private String realtimeVesselsTopic;

@@ -58,7 +58,7 @@ public class VesselEventStreams extends EventSourcingStreams {
		super(config, alertService);
		this.vesselTypeTopic = vesselTypeTopic + snapshotTopicSuffix;
		// this.vesselsAggByVesselTypeTopic = vesselsAggByVesselTypeTopic;
		this.vesselTypeUpdatedTopic = vesselTypeUpdatedTopic;
		// this.vesselTypeUpdatedTopic = vesselTypeUpdatedTopic;
		// this.hashMapSerdeAggregationVesselTypeInVessel = new
		// HashMapSerde<>(schemaRegistry);
		this.realtimeVesselsTopic = realtimeVesselsTopic;
@@ -435,7 +435,7 @@ public class VesselEventStreams extends EventSourcingStreams {
	@Override
	protected void processExtraStreams(KStream<String, Event> events, KStream<String, Event> snapshotEvents) {

		createVesselFromRealtimeVessel(realtimeVessel, snapshotEvents);
		createVesselFromRealtimeVessel(realtimeVessel, events);
	}

	private void createVesselFromRealtimeVessel(KStream<String, VesselDTO> realtimeVessel,
@@ -453,7 +453,10 @@ public class VesselEventStreams extends EventSourcingStreams {
		if (vesselEvent == null) {
			return getEnrichCreateVesselEventFromRealtimeVessel(vesselDTO);
		}
		if (!((VesselEvent) vesselEvent).getVessel().equals(vesselDTO)) {

		VesselDTO currentVesselDTO = ((VesselEvent) vesselEvent).getVessel();

		if (currentVesselDTO != null && !currentVesselDTO.equals(vesselDTO)) {
			return getEnrichUpdateVesselEventFromRealtimeVessel(vesselDTO, vesselEvent);
		}
		return null;