Commit 14187c9b authored by Noel Alonso's avatar Noel Alonso
Browse files

Comenta funcionalidad de postUpdate

La agregación de tantos datos produce problemas en el stream, se comenta
la funcionalidad y tests hasta implementarlo sin agregación
parent 1895e576
Loading
Loading
Loading
Loading
+45 −45
Original line number Diff line number Diff line
package es.redmic.vesselscommands.streams;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

import es.redmic.brokerlib.alert.AlertService;
import es.redmic.brokerlib.avro.common.Event;
import es.redmic.brokerlib.avro.common.EventError;
import es.redmic.brokerlib.avro.common.EventTypes;
import es.redmic.brokerlib.avro.serde.hashmap.HashMapSerde;
import es.redmic.commandslib.exceptions.ExceptionType;
import es.redmic.commandslib.streaming.common.StreamConfig;
import es.redmic.commandslib.streaming.streams.EventSourcingStreams;
import es.redmic.vesselscommands.commands.vessel.CreateVesselCommand;
import es.redmic.vesselscommands.commands.vessel.UpdateVesselCommand;
import es.redmic.vesselslib.dto.vessel.VesselDTO;
import es.redmic.vesselslib.dto.vesseltype.VesselTypeDTO;
import es.redmic.vesselslib.events.vessel.VesselEventFactory;
import es.redmic.vesselslib.events.vessel.VesselEventTypes;
import es.redmic.vesselslib.events.vessel.common.VesselEvent;
import es.redmic.vesselslib.events.vessel.create.CreateVesselEnrichedEvent;
import es.redmic.vesselslib.events.vessel.create.EnrichCreateVesselEvent;
import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.AggregationVesselTypeInVesselPostUpdateEvent;
import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.UpdateVesselTypeInVesselEvent;
import es.redmic.vesselslib.events.vessel.update.EnrichUpdateVesselEvent;
import es.redmic.vesselslib.events.vessel.update.UpdateVesselEnrichedEvent;
@@ -42,19 +33,21 @@ public class VesselEventStreams extends EventSourcingStreams {

	private String vesselTypeTopic;

	private String vesselsAggByVesselTypeTopic;
	// private String vesselsAggByVesselTypeTopic;

	private String vesselTypeUpdatedTopic;

	private String realtimeVesselsTopic;

	private HashMapSerde<String, AggregationVesselTypeInVesselPostUpdateEvent> hashMapSerdeAggregationVesselTypeInVessel;
	// private HashMapSerde<String, AggregationVesselTypeInVesselPostUpdateEvent>
	// hashMapSerdeAggregationVesselTypeInVessel;

	private GlobalKTable<String, HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent>> aggByVesselType;
	// private GlobalKTable<String, HashMap<String,
	// AggregationVesselTypeInVesselPostUpdateEvent>> aggByVesselType;

	private GlobalKTable<String, Event> vesselType;

	private KStream<String, Event> vesselTypeEvents;
	// private KStream<String, Event> vesselTypeEvents;

	private KStream<String, VesselDTO> realtimeVessel;

@@ -64,9 +57,10 @@ public class VesselEventStreams extends EventSourcingStreams {
			String vesselTypeUpdatedTopic, String realtimeVesselsTopic, AlertService alertService) {
		super(config, alertService);
		this.vesselTypeTopic = vesselTypeTopic + snapshotTopicSuffix;
		this.vesselsAggByVesselTypeTopic = vesselsAggByVesselTypeTopic;
		// this.vesselsAggByVesselTypeTopic = vesselsAggByVesselTypeTopic;
		this.vesselTypeUpdatedTopic = vesselTypeUpdatedTopic;
		this.hashMapSerdeAggregationVesselTypeInVessel = new HashMapSerde<>(schemaRegistry);
		// this.hashMapSerdeAggregationVesselTypeInVessel = new
		// HashMapSerde<>(schemaRegistry);
		this.realtimeVesselsTopic = realtimeVesselsTopic;

		init();
@@ -83,12 +77,12 @@ public class VesselEventStreams extends EventSourcingStreams {

		// Crea un store global para procesar los datos de todas las instancias de
		// vessels agregados por vesselType
		aggByVesselType = builder.globalTable(vesselsAggByVesselTypeTopic,
				Consumed.with(Serdes.String(), hashMapSerdeAggregationVesselTypeInVessel));
		/*-aggByVesselType = builder.globalTable(vesselsAggByVesselTypeTopic,
				Consumed.with(Serdes.String(), hashMapSerdeAggregationVesselTypeInVessel));-*/

		vesselType = builder.globalTable(vesselTypeTopic);

		vesselTypeEvents = builder.stream(vesselTypeUpdatedTopic);
		// vesselTypeEvents = builder.stream(vesselTypeUpdatedTopic);

		realtimeVessel = builder.stream(realtimeVesselsTopic);
	}
@@ -348,7 +342,7 @@ public class VesselEventStreams extends EventSourcingStreams {
	 * Función para procesar modificaciones de referencias
	 */

	@Override
	/*-@Override
	protected void processPostUpdateStream(KStream<String, Event> vesselEvents) {
	
		KStream<String, Event> vesselEventsStream = vesselEvents.filter((id, event) -> {
@@ -362,14 +356,14 @@ public class VesselEventStreams extends EventSourcingStreams {
	
		// processar los vesseltype modificados
		processVesselTypePostUpdate();
	}
	}-*/

	private String getVesselTypeIdFromVessel(Event evt) {

		return ((VesselEvent) evt).getVessel().getType().getId();
	}

	private void aggregateVesselsByVesselType(KStream<String, Event> vesselEventsStream) {
	/*-private void aggregateVesselsByVesselType(KStream<String, Event> vesselEventsStream) {
	
		vesselEventsStream.groupByKey()
				.aggregate(HashMap<String, AggregationVesselTypeInVesselPostUpdateEvent>::new,
@@ -436,7 +430,7 @@ public class VesselEventStreams extends EventSourcingStreams {
			}
		}
		return result;
	}
	}-*/

	@Override
	protected void processExtraStreams(KStream<String, Event> events, KStream<String, Event> snapshotEvents) {
@@ -487,4 +481,10 @@ public class VesselEventStreams extends EventSourcingStreams {
		evt.setUserId(REDMIC_PROCESS);
		return evt;
	}

	@Override
	protected void processPostUpdateStream(KStream<String, Event> events) {
		// TODO Auto-generated method stub

	}
}
+2 −48
Original line number Diff line number Diff line
package es.redmic.test.vesselscommands.integration.vessel;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;

import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import es.redmic.brokerlib.alert.AlertType;
import es.redmic.brokerlib.alert.Message;
import es.redmic.brokerlib.avro.common.Event;
import es.redmic.exception.common.ExceptionType;
import es.redmic.test.vesselscommands.integration.KafkaEmbeddedConfig;
import es.redmic.test.vesselscommands.integration.vesseltype.VesselTypeDataUtil;
import es.redmic.testutils.kafka.KafkaBaseIntegrationTest;
import es.redmic.vesselscommands.VesselsCommandsApplication;
import es.redmic.vesselslib.events.vessel.VesselEventTypes;
import es.redmic.vesselslib.events.vessel.common.VesselCancelledEvent;
import es.redmic.vesselslib.events.vessel.create.CreateVesselEvent;
import es.redmic.vesselslib.events.vessel.create.VesselCreatedEvent;
import es.redmic.vesselslib.events.vessel.partialupdate.vesseltype.UpdateVesselTypeInVesselEvent;
import es.redmic.vesselslib.events.vessel.update.UpdateVesselConfirmedEvent;
import es.redmic.vesselslib.events.vessel.update.UpdateVesselFailedEvent;
import es.redmic.vesselslib.events.vessel.update.VesselUpdatedEvent;
import es.redmic.vesselslib.events.vesseltype.update.VesselTypeUpdatedEvent;

@RunWith(SpringJUnit4ClassRunner.class)
/*-@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = { VesselsCommandsApplication.class })
@ActiveProfiles("test")
@DirtiesContext
@@ -211,4 +165,4 @@ public class VesselPostUpdateHandlerTest extends KafkaBaseIntegrationTest {
	public void defaultEvent(Object def) {

	}
}
}-*/
+10 −12
Original line number Diff line number Diff line
@@ -38,12 +38,10 @@ import es.redmic.exception.data.DeleteItemException;
import es.redmic.exception.data.ItemAlreadyExistException;
import es.redmic.exception.data.ItemNotFoundException;
import es.redmic.test.vesselscommands.integration.KafkaEmbeddedConfig;
import es.redmic.test.vesselscommands.integration.vessel.VesselDataUtil;
import es.redmic.testutils.kafka.KafkaBaseIntegrationTest;
import es.redmic.vesselscommands.VesselsCommandsApplication;
import es.redmic.vesselscommands.handler.VesselTypeCommandHandler;
import es.redmic.vesselslib.dto.vesseltype.VesselTypeDTO;
import es.redmic.vesselslib.events.vessel.create.VesselCreatedEvent;
import es.redmic.vesselslib.events.vesseltype.VesselTypeEventTypes;
import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeCancelledEvent;
import es.redmic.vesselslib.events.vesseltype.create.CreateVesselTypeConfirmedEvent;
@@ -267,7 +265,7 @@ public class VesselTypeCommandHandlerTest extends KafkaBaseIntegrationTest {

	// Envía un evento de comprobación de que el elemento puede ser borrado y debe
	// provocar un evento DeleteVesselTypeCheckFailedEvent ya que está referenciado
	@Test
	/*-@Test
	public void checkDeleteVesselTypeEvent_SendDeleteVesselTypeCheckFailedEvent_IfReceivesSuccess()
			throws InterruptedException {
	
@@ -292,7 +290,7 @@ public class VesselTypeCommandHandlerTest extends KafkaBaseIntegrationTest {
		assertEquals(event.getUserId(), confirm.getUserId());
		assertEquals(event.getSessionId(), confirm.getSessionId());
		assertEquals(event.getVersion(), confirm.getVersion());
	}
	}-*/

	// Envía un evento de error de borrado y debe provocar un evento Cancelled con
	// el item dentro