package com.casic.missiles.es; import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.casic.missiles.core.es.EsResponse; import org.apache.http.nio.entity.NStringEntity; import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.IOException; import java.util.ArrayList; import java.util.Base64; import java.util.List; /** * ES工具类 * * @author zhangyingjie123 * @since 2020-11-30 */ @Component public class ElasticSearchUtil { private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtil.class); @Autowired private RestClient clientMapping; @Autowired private Environment env; private static String username; private static String password; private static RestClient client; @PostConstruct public void init(){ client = clientMapping; username = env.getProperty("casic.data.es.username"); password = env.getProperty("casic.data.es.password"); } public static JSONObject selectDocumentById(String index, String type, String id) throws IOException{ String method = "GET"; String endPoint = index.concat("/").concat(type).concat("/").concat(id); Response response = performRequest(method,endPoint,null); JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); return result.getJSONObject("_source"); } public static Response addDocumentById(String index, String type, String id, Object entity) throws IOException{ String method = "POST"; String endPoint = index.concat("/").concat(type).concat("/"); if(ObjectUtil.isNotEmpty(id)){ endPoint.concat(id); } endPoint.concat("?refresh=true"); logger.debug(entity.toString()); return performRequest(method,endPoint,JSON.toJSONString(entity)); } public static Response deleteDocumentById(String index, String type, String id) throws IOException{ String method = "DELETE"; String endPoint = index.concat("/").concat(type).concat("/").concat(id).concat("?refresh=true"); return performRequest(method,endPoint,null); } public static Response updateDocumentById(String index, String type, String id, Object entity) throws IOException{ String method = "POST"; String endPoint = index.concat("/").concat(type).concat("/").concat(id).concat("?refresh=true"); logger.debug(entity.toString()); return performRequest(method,endPoint,JSON.toJSONString(entity)); } public static EsResponse searchQuery(String index, String type, ElasticSearchQuery query) throws IOException{ String method = "POST"; String entPoint = index.concat("/").concat(type).concat("/").concat("_search"); logger.debug(query.getBody().toString()); Response response = performRequest(method, entPoint, query.getBody().toString()); // 获取response body,转换为json对象 JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); return result.toJavaObject(EsResponse.class); } public static Response aggsQuery(String index, String type, ElasticSearchQuery query) throws IOException{ String method = "POST"; String entPoint = index.concat("/").concat(type).concat("/").concat("_search"); logger.debug(query.getBody().toString()); Response response = performRequest(method, entPoint, query.getBody().toString()); return response; } public static List<Object> searchQueryScroll(String index, String type, ElasticSearchQuery query) throws IOException{ // 首次查询,提交查询条件,endpoint增加‘?scroll=1m’ List<Object> results = new ArrayList<>(); String method = "POST"; String entPoint = index.concat("/").concat(type).concat("/").concat("_search").concat("?scroll=1m"); logger.debug(query.getBody().toString()); Response response = performRequest(method, entPoint, query.getBody().toString()); JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); EsResponse esResponse = result.toJavaObject(EsResponse.class); String scrollId = result.getString("_scroll_id"); if(ObjectUtil.isNotEmpty(esResponse.getDatas())){ results.addAll(esResponse.getDatas()); } // 循环发送scroll请求,直到返回结果为空 entPoint = "_search/scroll"; JSONObject scrollBody = new JSONObject(); scrollBody.put("scroll","1m"); scrollBody.put("scroll_id",scrollId); while (esResponse.getDatas().size() > 0){ method = "GET"; response = performRequest(method, entPoint, scrollBody.toString()); result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); esResponse = result.toJavaObject(EsResponse.class); if(ObjectUtil.isNotEmpty(esResponse.getDatas())){ results.addAll(esResponse.getDatas()); } } // 查询完成后,及时删除scroll,释放资源 scrollBody.remove("scroll"); method = "DELETE"; performRequest(method, entPoint, scrollBody.toString()); return results; } public static Response bulk(String index,String type,String entity) throws IOException{ String method = "POST"; StringBuilder endPoint = new StringBuilder(); if(ObjectUtil.isNotEmpty(index)){ endPoint.append(index).append("/"); } if(ObjectUtil.isNotEmpty(type)){ endPoint.append(type).append("/"); } endPoint.append("_bulk"); endPoint.append("?refresh=true"); logger.debug(entity); return performRequest(method,endPoint.toString(),entity); } private static Response performRequest(String method, String endpoint, String entity) throws IOException { //logger.info(method + " " + endpoint); Request request = new Request(method,endpoint); if(ObjectUtil.isNotEmpty(entity)){ request.setEntity(new NStringEntity(entity,"utf-8")); } if(ObjectUtil.isNotEmpty(username) && ObjectUtil.isNotEmpty(password)){ RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); String token = username.concat(":").concat(password); String base64encode = Base64.getEncoder().encodeToString(token.getBytes("utf-8")); builder.addHeader("Authorization", "Basic " + base64encode ); request.setOptions(builder.build()); } Response response = client.performRequest(request); logger.debug(response.toString()); return response; } }