+1 −1
Original line number Original line Diff line number Diff line
@@ -55,7 +55,7 @@


		<dependency>
		<dependency>
			<groupId>org.elasticsearch.client</groupId>
			<groupId>org.elasticsearch.client</groupId>
			<artifactId>transport</artifactId>
			<artifactId>elasticsearch-rest-high-level-client</artifactId>
		</dependency>
		</dependency>


		<!-- Other -->
		<!-- Other -->
+239 −114

File changed.

Preview size limit exceeded, changes collapsed.

+59 −50
Original line number Original line Diff line number Diff line
package es.redmic.elasticsearchlib.common.utils;
package es.redmic.elasticsearchlib.common.utils;


import java.io.IOException;
import java.util.ArrayList;
import java.util.ArrayList;
import java.util.List;
import java.util.List;
import java.util.Map;
import java.util.Map;
import java.util.concurrent.ExecutionException;


import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Component;


@@ -46,57 +46,59 @@ public class ElasticPersistenceUtils<TModel extends BaseES<?>> {


		// @formatter:off
		// @formatter:off
		
		
		IndexResponse result = ESProvider.getClient()
		IndexRequest request = new IndexRequest(index, type)
			.prepareIndex(index, type)
				.source(convertTModelToSource(model))
			.setSource(convertTModelToSource(model))
				.id(id)
			.setId(id)
				.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
			.setRefreshPolicy(RefreshPolicy.IMMEDIATE)
				.execute()
					.actionGet();
		
		
		// @formatter:on
		// @formatter:on
		int failed = result.getShardInfo().getFailed();


		if (failed != 0) {
		try {
			ESProvider.getClient().index(request, RequestOptions.DEFAULT);
			return new EventApplicationResult(true);
		} catch (IOException e) {
			logger.debug("Error indexando en " + index + " " + type);
			logger.debug("Error indexando en " + index + " " + type);
			e.printStackTrace();
			return new EventApplicationResult(ExceptionType.ES_INDEX_DOCUMENT.toString());
			return new EventApplicationResult(ExceptionType.ES_INDEX_DOCUMENT.toString());
		}
		}

		return new EventApplicationResult(true);
	}
	}


	public EventApplicationResult update(String index, String type, TModel model, String id) {
	public EventApplicationResult update(String index, String type, TModel model, String id) {


		UpdateRequest updateRequest = new UpdateRequest();
		// @formatter:off
		updateRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);

		updateRequest.index(index);
		UpdateRequest updateRequest = new UpdateRequest(index, type, id)
		updateRequest.type(type);
				.doc(convertTModelToSource(model))
		updateRequest.id(id);
				.fetchSource(false)
		updateRequest.doc(convertTModelToSource(model));
				.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
		updateRequest.fetchSource(true);

		// @formatter:on


		try {
		try {
			ESProvider.getClient().update(updateRequest).get();
			ESProvider.getClient().update(updateRequest, RequestOptions.DEFAULT);
		} catch (InterruptedException | ExecutionException e) {

			return new EventApplicationResult(true);
		} catch (IOException e) {
			logger.debug("Error modificando el item con id " + id + " en " + index + " " + type);
			logger.debug("Error modificando el item con id " + id + " en " + index + " " + type);
			e.printStackTrace();
			return new EventApplicationResult(ExceptionType.ES_UPDATE_DOCUMENT.toString());
			return new EventApplicationResult(ExceptionType.ES_UPDATE_DOCUMENT.toString());
		}
		}


		return new EventApplicationResult(true);
	}
	}


	public EventApplicationResult update(String index, String type, String id, XContentBuilder doc) {
	public EventApplicationResult update(String index, String type, String id, XContentBuilder doc) {


		UpdateRequest updateRequest = new UpdateRequest();
		// @formatter:off
		updateRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
		
		updateRequest.index(index);
		UpdateRequest updateRequest = new UpdateRequest(index, type, id)
		updateRequest.type(type);
				.setRefreshPolicy(RefreshPolicy.IMMEDIATE)
		updateRequest.id(id);
				.doc(doc)
		updateRequest.doc(doc);
				.fetchSource(true);
		updateRequest.fetchSource(true);
		
		// @formatter:on


		try {
		try {
			ESProvider.getClient().update(updateRequest).get();
			ESProvider.getClient().update(updateRequest, RequestOptions.DEFAULT);
		} catch (Exception e) {
		} catch (Exception e) {
			logger.debug("Error modificando el item con id " + id + " en " + index + " " + type);
			logger.debug("Error modificando el item con id " + id + " en " + index + " " + type);
			return new EventApplicationResult(ExceptionType.ES_UPDATE_DOCUMENT.toString());
			return new EventApplicationResult(ExceptionType.ES_UPDATE_DOCUMENT.toString());
@@ -107,21 +109,15 @@ public class ElasticPersistenceUtils<TModel extends BaseES<?>> {


	public EventApplicationResult delete(String index, String type, String id) {
	public EventApplicationResult delete(String index, String type, String id) {


		// @formatter:off
		DeleteRequest deleteRequest = new DeleteRequest(index, type, id).setRefreshPolicy(RefreshPolicy.IMMEDIATE);

		DeleteResponse result = ESProvider.getClient()
			.prepareDelete(index, type, id)
			.setRefreshPolicy(RefreshPolicy.IMMEDIATE)
				.execute()
					.actionGet();

		// @formatter:on


		if (!result.status().equals(RestStatus.OK)) {
		try {
			ESProvider.getClient().delete(deleteRequest, RequestOptions.DEFAULT);
			return new EventApplicationResult(true);
		} catch (IOException e) {
			logger.debug("Error borrando el item con id " + id + " en " + index + " " + type);
			logger.debug("Error borrando el item con id " + id + " en " + index + " " + type);
			return new EventApplicationResult(ExceptionType.DELETE_ITEM_EXCEPTION.toString());
			return new EventApplicationResult(ExceptionType.DELETE_ITEM_EXCEPTION.toString());
		}
		}
		return new EventApplicationResult(true);
	}
	}


	@SuppressWarnings("unchecked")
	@SuppressWarnings("unchecked")
@@ -167,13 +163,19 @@ public class ElasticPersistenceUtils<TModel extends BaseES<?>> {


	public List<UpdateResponse> updateByBulk(List<UpdateRequest> listUpdates) {
	public List<UpdateResponse> updateByBulk(List<UpdateRequest> listUpdates) {


		BulkRequestBuilder bulkRequest = ESProvider.getClient().prepareBulk();
		BulkRequest bulkRequest = new BulkRequest();


		for (int i = 0; i < listUpdates.size(); i++)
		for (int i = 0; i < listUpdates.size(); i++)
			bulkRequest.add(listUpdates.get(i));
			bulkRequest.add(listUpdates.get(i));
		bulkRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
		bulkRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);


		BulkResponse bulkResponse = bulkRequest.execute().actionGet();
		BulkResponse bulkResponse;
		try {
			bulkResponse = ESProvider.getClient().bulk(bulkRequest, RequestOptions.DEFAULT);
		} catch (IOException e) {
			e.printStackTrace();
			throw new ESUpdateException("Error ejecutando modificación en batch");
		}


		if (bulkResponse.hasFailures()) {
		if (bulkResponse.hasFailures()) {


@@ -197,12 +199,19 @@ public class ElasticPersistenceUtils<TModel extends BaseES<?>> {


	public List<IndexResponse> indexByBulk(List<IndexRequest> listIndexs) {
	public List<IndexResponse> indexByBulk(List<IndexRequest> listIndexs) {


		BulkRequestBuilder bulkRequest = ESProvider.getClient().prepareBulk();
		BulkRequest bulkRequest = new BulkRequest();


		for (int i = 0; i < listIndexs.size(); i++)
		for (int i = 0; i < listIndexs.size(); i++)
			bulkRequest.add(listIndexs.get(i));
			bulkRequest.add(listIndexs.get(i));
		bulkRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
		bulkRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
		BulkResponse bulkResponse = bulkRequest.execute().actionGet();

		BulkResponse bulkResponse;
		try {
			bulkResponse = ESProvider.getClient().bulk(bulkRequest, RequestOptions.DEFAULT);
		} catch (IOException e) {
			e.printStackTrace();
			throw new ESUpdateException("Error ejecutando indexación en batch");
		}


		if (bulkResponse.hasFailures())
		if (bulkResponse.hasFailures())
			throw new ESUpdateException(bulkResponse.buildFailureMessage());
			throw new ESUpdateException(bulkResponse.buildFailureMessage());
+0 −34
Original line number Original line Diff line number Diff line
package es.redmic.elasticsearchlib.config;

import java.util.List;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ElasticsearchConfiguration {

	@Value("#{'${elastic.addresses}'.split(',')}")
	private List<String> addresses;
	@Value("${elastic.port}")
	private Integer port;
	@Value("${elastic.clusterName}")
	private String clusterName;
	@Value("${elastic.user}")
	private String user;
	@Value("${elastic.password}")
	private String password;

	@Bean
	public EsClientProvider esClientProvider() {

		EsConfig elastic = new EsConfig();
		elastic.setAddresses(addresses);
		elastic.setPort(port);
		elastic.setClusterName(clusterName);
		elastic.setUser(user);
		elastic.setPassword(password);
		return new EsClientProvider(elastic);
	}
}
+44 −36
Original line number Original line Diff line number Diff line
package es.redmic.elasticsearchlib.config;
package es.redmic.elasticsearchlib.config;


import java.net.InetAddress;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.List;


import javax.annotation.PostConstruct;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.PreDestroy;


import org.apache.http.HttpHost;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;


import es.redmic.exception.common.ExceptionType;
import es.redmic.exception.common.ExceptionType;
import es.redmic.exception.common.InternalException;
import es.redmic.exception.common.InternalException;


@Configuration
public class EsClientProvider {
public class EsClientProvider {


	private TransportClient client;
	private RestHighLevelClient client;


	@Value("#{'${elastic.addresses}'.split(',')}")
	private List<String> addresses;
	private List<String> addresses;
	@Value("${elastic.port}")
	private Integer port;
	private Integer port;
	private String clusterName;
	@Value("${elastic.secured}")
	private Boolean secured;
	@Value("${elastic.user}")
	private String user;
	private String user;
	@Value("${elastic.password}")
	private String password;
	private String password;


	protected static Logger logger = LogManager.getLogger();
	protected static Logger logger = LogManager.getLogger();


	public EsClientProvider(EsConfig config) {
	public EsClientProvider() {
		this.addresses = config.getAddresses();
		this.port = config.getPort();
		this.clusterName = config.getClusterName();
		this.user = config.getUser();
		this.password = config.getPassword();
	}
	}


	public TransportClient getClient() {
	public RestHighLevelClient getClient() {
		if (client == null)
		if (client == null)
			connect();
			connect();
		return client;
		return client;
@@ -48,36 +52,40 @@ public class EsClientProvider {
	@PostConstruct
	@PostConstruct
	private void connect() {
	private void connect() {


		// @formatter:off
		String authorization = "";
		if (secured)
			authorization = user + ":" + password + "@";


		Settings settings = Settings.builder()
		List<HttpHost> hosts = new ArrayList<>();
				.put("cluster.name", this.clusterName)
		for (String address : addresses) {
				.build();


		String authorization = Base64.getEncoder().encodeToString((user + ":" + password).getBytes());
			hosts.add(new HttpHost(authorization + address, port, "http"));
		}


		// @formatter:on
		client = new RestHighLevelClient(RestClient.builder(hosts.toArray(new HttpHost[hosts.size()])));


		client = new PreBuiltTransportClient(settings);
		checkClusterHealth();
		client.threadPool().getThreadContext().putHeader("Authorization", "Basic " + authorization);
	}

	private void checkClusterHealth() {

		ClusterHealthResponse response;


		for (String address : addresses) {
		try {
		try {
				client.addTransportAddress(new TransportAddress(InetAddress.getByName(address), port));
			response = client.cluster().health(new ClusterHealthRequest(), RequestOptions.DEFAULT);
			} catch (UnknownHostException e) {
		} catch (IOException e) {
				logger.warn(e.getMessage());
			e.printStackTrace();
			}
			throw new InternalException(ExceptionType.INTERNAL_EXCEPTION);
		}
		}


		List<DiscoveryNode> nodes = client.connectedNodes();
		if (response.getStatus().equals(ClusterHealthStatus.RED)) {
		if (nodes == null || nodes.isEmpty()) {
			logger.error("Imposible conectar con elastic. Cluster no saludable");
			// TODO: Añadir excepción propia
			throw new InternalException(ExceptionType.INTERNAL_EXCEPTION);
			throw new InternalException(ExceptionType.INTERNAL_EXCEPTION);
		}
		}
	}
	}


	@PreDestroy
	@PreDestroy
	private void disconnect() {
	private void disconnect() throws IOException {
		client.close();
		client.close();
	}
	}
}
}
+0 −63
Original line number Original line Diff line number Diff line
package es.redmic.elasticsearchlib.config;

import java.util.List;

public class EsConfig {

	private List<String> addresses;
	private Integer port;
	private String clusterName;
	private String user;
	private String password;

	public EsConfig() {
	}

	public EsConfig(List<String> addresses, Integer port, String clusterName, String user, String password) {
		this.addresses = addresses;
		this.port = port;
		this.clusterName = clusterName;
		this.user = user;
		this.password = password;
	}

	public List<String> getAddresses() {
		return addresses;
	}

	public void setAddresses(List<String> addresses) {
		this.addresses = addresses;
	}

	public Integer getPort() {
		return port;
	}

	public void setPort(Integer port) {
		this.port = port;
	}

	public String getClusterName() {
		return clusterName;
	}

	public void setClusterName(String clusterName) {
		this.clusterName = clusterName;
	}

	public String getUser() {
		return user;
	}

	public void setUser(String user) {
		this.user = user;
	}

	public String getPassword() {
		return password;
	}

	public void setPassword(String password) {
		this.password = password;
	}
}
+1 −1
Original line number Original line Diff line number Diff line
@@ -25,7 +25,7 @@ public abstract class RDataESRepository<TModel extends BaseES<?>, TQueryDTO exte
	public RDataESRepository() {
	public RDataESRepository() {
	}
	}


	public RDataESRepository(String[] index, String[] type) {
	public RDataESRepository(String[] index, String type) {
		super(index, type);
		super(index, type);
	}
	}


+5 −6
Original line number Original line Diff line number Diff line
@@ -19,7 +19,7 @@ public abstract class RWDataESRepository<TModel extends BaseES<?>, TQueryDTO ext
		super();
		super();
	}
	}


	public RWDataESRepository(String[] index, String[] type) {
	public RWDataESRepository(String[] index, String type) {
		super(index, type);
		super(index, type);
	}
	}


@@ -32,7 +32,7 @@ public abstract class RWDataESRepository<TModel extends BaseES<?>, TQueryDTO ext
			return checkInsert;
			return checkInsert;
		}
		}


		return elasticPersistenceUtils.save(getIndex()[0], getType()[0], modelToIndex, modelToIndex.getId().toString());
		return elasticPersistenceUtils.save(getIndex()[0], getType(), modelToIndex, modelToIndex.getId().toString());
	}
	}


	@Override
	@Override
@@ -44,14 +44,13 @@ public abstract class RWDataESRepository<TModel extends BaseES<?>, TQueryDTO ext
			return checkUpdate;
			return checkUpdate;
		}
		}


		return elasticPersistenceUtils.update(getIndex()[0], getType()[0], modelToIndex,
		return elasticPersistenceUtils.update(getIndex()[0], getType(), modelToIndex, modelToIndex.getId().toString());
				modelToIndex.getId().toString());
	}
	}


	@Override
	@Override
	public EventApplicationResult update(String id, XContentBuilder doc) {
	public EventApplicationResult update(String id, XContentBuilder doc) {


		return elasticPersistenceUtils.update(getIndex()[0], getType()[0], id, doc);
		return elasticPersistenceUtils.update(getIndex()[0], getType(), id, doc);
	}
	}


	@Override
	@Override
@@ -63,7 +62,7 @@ public abstract class RWDataESRepository<TModel extends BaseES<?>, TQueryDTO ext
			return checkDelete;
			return checkDelete;
		}
		}


		return elasticPersistenceUtils.delete(getIndex()[0], getType()[0], id);
		return elasticPersistenceUtils.delete(getIndex()[0], getType(), id);
	}
	}


	/*
	/*
+2 −2
Original line number Original line Diff line number Diff line
@@ -19,11 +19,11 @@ import es.redmic.models.es.geojson.wrapper.GeoSearchWrapper;
public abstract class RGeoDataESRepository<TModel extends Feature<?, ?>, TQueryDTO extends SimpleQueryDTO>
public abstract class RGeoDataESRepository<TModel extends Feature<?, ?>, TQueryDTO extends SimpleQueryDTO>
		extends RBaseESRepository<TModel, TQueryDTO> {
		extends RBaseESRepository<TModel, TQueryDTO> {


	public RGeoDataESRepository(String[] index, String[] type, Boolean rollOverIndex) {
	public RGeoDataESRepository(String[] index, String type, Boolean rollOverIndex) {
		super(index, type, rollOverIndex);
		super(index, type, rollOverIndex);
	}
	}


	public RGeoDataESRepository(String[] index, String[] type) {
	public RGeoDataESRepository(String[] index, String type) {
		super(index, type);
		super(index, type);
	}
	}