package com.casic.missiles.es; import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.casic.missiles.dto.EsResponse; import org.apache.http.nio.entity.NStringEntity; import org.apache.http.util.EntityUtils; import org.apache.poi.ss.formula.functions.T; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.IOException; import java.util.ArrayList; import java.util.Base64; import java.util.List; /** * ES工具类 * * @author zhangyingjie123 * @since 2020-11-30 */ @Component public class ElasticSearchUtil { private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtil.class); @Autowired private RestClient clientMapping; @Autowired private Environment env; private static String username; private static String password; private static RestClient client; @PostConstruct public void init() { client = clientMapping; username = env.getProperty("casic.data.es.username"); password = env.getProperty("casic.data.es.password"); } /** * @param index * @param type * @param id * @return * @throws IOException */ public static JSONObject selectDocumentById(String index, String type, String id) throws IOException { String method = "GET"; String endPoint = index.concat("/").concat(type).concat("/").concat(id); Response response = performRequest(method, endPoint, null); JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); return result.getJSONObject("_source"); } /** * 根据id新增 */ public static Response addDocumentById(String index, String type, String id, Object entity) throws IOException { String method = "POST"; String endPoint = index.concat("/").concat(type).concat("/"); if (ObjectUtil.isNotEmpty(id)) { endPoint.concat(id); } endPoint.concat("?refresh=true"); // logger.debug(entity.toString()); return performRequest(method, endPoint, JSON.toJSONString(entity)); } /** * 根据id删除 */ public static Response deleteDocumentById(String index, String type, String id) throws IOException { String method = "DELETE"; String endPoint = index.concat("/").concat(type).concat("/").concat(id).concat("?refresh=true"); return performRequest(method, endPoint, null); } /** * 根据id更新 */ public static Response updateDocumentById(String index, String type, String id, Object entity) throws IOException { String method = "POST"; String endPoint = index.concat("/").concat(type).concat("/").concat(id).concat("?refresh=true"); logger.debug(entity.toString()); return performRequest(method, endPoint, JSON.toJSONString(entity)); } /** * 根据条件查询 */ public static List<T> searchQuery(String index, String type, ElasticSearchQuery query) throws IOException { String method = "POST"; String entPoint = index.concat("/").concat(type).concat("/").concat("_search"); logger.debug(query.getBody().toString()); Response response = performRequest(method, entPoint, query.getBody().toString()); // 获取response body,转换为json对象 JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); EsResponse esResponse = result.toJavaObject(EsResponse.class); List<T> results = esResponse.getDatas(); return results; } /** * 根据条件查询返回分页 */ public static<T> Page<T> searchQueryPage(Page<T> page, String index, String type, ElasticSearchQuery query) throws IOException { String method = "POST"; String entPoint = index.concat("/").concat(type).concat("/").concat("_search"); logger.debug(query.getBody().toString()); Response response = performRequest(method, entPoint, query.getBody().toString()); // 获取response body,转换为json对象 JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); EsResponse esResponse = result.toJavaObject(EsResponse.class); List<T> results = esResponse.getDatas(); page.setRecords(results); page.setTotal(Long.valueOf(esResponse.getHits().getTotal())); return page; } /** * 根据条件进行滚动查询 */ public static<T> List<T> searchQueryScroll(String index, String type, ElasticSearchQuery query) throws IOException { // 首次查询,提交查询条件,endpoint增加‘?scroll=1m’ List<T> results = new ArrayList<>(); String method = "POST"; String entPoint = index.concat("/").concat(type).concat("/").concat("_search").concat("?scroll=1m"); logger.debug(query.getBody().toString()); Response response = performRequest(method, entPoint, query.getBody().toString()); JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); EsResponse esResponse = result.toJavaObject(EsResponse.class); String scrollId = result.getString("_scroll_id"); if (ObjectUtil.isNotEmpty(esResponse.getDatas())) { results.addAll(esResponse.getDatas()); } // 循环发送scroll请求,直到返回结果为空 entPoint = "_search/scroll"; JSONObject scrollBody = new JSONObject(); scrollBody.put("scroll", "1m"); scrollBody.put("scroll_id", scrollId); while (esResponse.getDatas().size() > 0) { method = "GET"; response = performRequest(method, entPoint, scrollBody.toString()); result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); esResponse = result.toJavaObject(EsResponse.class); if (ObjectUtil.isNotEmpty(esResponse.getDatas())) { results.addAll(esResponse.getDatas()); } } // 查询完成后,及时删除scroll,释放资源 scrollBody.remove("scroll"); method = "DELETE"; performRequest(method, entPoint, scrollBody.toString()); return results; } /** * 批处理 */ public static Response bulk(String index, String type, String entity) throws IOException { String method = "POST"; StringBuilder endPoint = new StringBuilder(); if (ObjectUtil.isNotEmpty(index)) { endPoint.append(index).append("/"); } if (ObjectUtil.isNotEmpty(type)) { endPoint.append(type).append("/"); } endPoint.append("_bulk"); endPoint.append("?refresh=true"); logger.debug(entity); return performRequest(method, endPoint.toString(), entity); } /** * 请求操作通用类 */ private static Response performRequest(String method, String endpoint, String entity) throws IOException { //logger.info(method + " " + endpoint); Request request = new Request(method, endpoint); if (ObjectUtil.isNotEmpty(entity)) { request.setEntity(new NStringEntity(entity, "utf-8")); } if (ObjectUtil.isNotEmpty(username) && ObjectUtil.isNotEmpty(password)) { RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); String token = username.concat(":").concat(password); String base64encode = Base64.getEncoder().encodeToString(token.getBytes("utf-8")); builder.addHeader("Authorization", "Basic " + base64encode); request.setOptions(builder.build()); } Response response = client.performRequest(request); // logger.debug(response.toString()); return response; } }