package com.casic.missiles.es; import cn.hutool.core.stream.CollectorUtil; import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.casic.missiles.dto.EsResponse; import org.apache.commons.lang3.StringUtils; import org.apache.http.nio.entity.NStringEntity; import org.apache.http.util.EntityUtils; import org.apache.poi.ss.formula.functions.T; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import javax.annotation.PostConstruct; import java.io.IOException; import java.text.SimpleDateFormat; import java.time.LocalTime; import java.time.temporal.ChronoUnit; import java.util.*; /** * ES工具类 * * @author zhangyingjie123 * @since 2020-11-30 */ @Component public class ElasticSearchUtil { private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtil.class); @Autowired private RestClient clientMapping; @Autowired private Environment env; private static String username; private static String password; private static RestClient client; @PostConstruct public void init() { client = clientMapping; username = env.getProperty("casic.data.es.username"); password = env.getProperty("casic.data.es.password"); } /** * @param index * @param type * @param id * @return * @throws IOException */ public static JSONObject selectDocumentById(String index, String type, String id) throws IOException { String method = "GET"; String endPoint = index.concat("/").concat(type).concat("/").concat(id); Response response = performRequest(method, endPoint, null); JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); return result.getJSONObject("_source"); } /** * 根据id新增 */ public static Response addDocumentById(String index, String type, String id, Object entity) throws IOException { String method = "POST"; String endPoint = index.concat("/").concat(type).concat("/"); if (ObjectUtil.isNotEmpty(id)) { endPoint.concat(id); } endPoint.concat("?refresh=true"); // logger.debug(entity.toString()); return performRequest(method, endPoint, JSON.toJSONString(entity)); } /** * 根据id删除 */ public static Response deleteDocumentById(String index, String type, String id) throws IOException { String method = "DELETE"; String endPoint = index.concat("/").concat(type).concat("/").concat(id).concat("?refresh=true"); return performRequest(method, endPoint, null); } /** * 根据id更新 */ public static Response updateDocumentById(String index, String type, String id, Object entity) throws IOException { String method = "POST"; String endPoint = index.concat("/").concat(type).concat("/").concat(id).concat("?refresh=true"); logger.debug(entity.toString()); return performRequest(method, endPoint, JSON.toJSONString(entity)); } /** * 根据条件查询 */ public static List<T> searchQuery(String index, String type, ElasticSearchQuery query) throws IOException { String method = "POST"; String entPoint = index.concat("/").concat(type).concat("/").concat("_search"); logger.debug(query.getBody().toString()); Response response = performRequest(method, entPoint, query.getBody().toString()); // 获取response body,转换为json对象 JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); EsResponse esResponse = result.toJavaObject(EsResponse.class); List<T> results = esResponse.getDatas(); return results; } /** * 只支持1万条以下的下标查询 * 根据条件查询返回分页 */ public static <T> Page<T> searchQueryPage(Page<T> page, String index, String type, ElasticSearchQuery query) throws IOException { String method = "POST"; String entPoint = index.concat("/").concat(type).concat("/").concat("_search"); logger.debug(query.getBody().toString()); Response response = performRequest(method, entPoint, query.getBody().toString()); // 获取response body,转换为json对象 JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); EsResponse esResponse = result.toJavaObject(EsResponse.class); List<T> results = esResponse.getDatas(); page.setRecords(results); page.setTotal(Long.valueOf(esResponse.getHits().getTotal())); return page; } /** * 1、根据条件查询返回分页,小于1w条最直接返回 * 2、超过1w条,通过回滚框分情况讨论 */ public static <T> Page<T> searchScrollQueryPage(Page<T> page, String index, String type, String startTime, String endTime, ElasticSearchQuery query) throws Exception { Long currentSize =(page.getCurrent()-1) * page.getSize(); if (currentSize < 10000) { query.range("logTime", startTime, endTime); query.from(Integer.valueOf(String.valueOf(currentSize))); return searchQueryPage(page, index, type, query); } else { return budgetQueryPage(page, index, type, startTime, endTime, query); } } /** * 1、通过总条数/3600计算小时数 * 2、获取第一条数据的时间差,然后再次进行计算 * 3、预算查询,直到获取10000条内的数据,这样效率会远高于窗口滚动查询 */ private static <T> Page<T> budgetQueryPage(Page<T> page, String index, String type, String startTime, String endTime, ElasticSearchQuery query) throws Exception { String method = "POST"; String entPoint = index.concat("/").concat(type).concat("/").concat("_search"); query.range("logTime", startTime, endTime); Response response = performRequest(method, entPoint, query.getBody().toString()); JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); EsResponse esResponse = result.toJavaObject(EsResponse.class); page.setTotal(Long.valueOf(esResponse.getHits().getTotal())); JSONObject jsonObject = getOffsetTime(page, startTime, endTime, entPoint, query); if (ObjectUtils.isEmpty(jsonObject)) { return page; } query.removeRange(); query.ranges("logTime", startTime, (String) jsonObject.get("time")); query.from(Integer.valueOf(String.valueOf(jsonObject.get("from")))); response = performRequest(method, entPoint, query.getBody().toString()); result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); esResponse = result.toJavaObject(EsResponse.class); List<T> results = esResponse.getDatas(); page.setRecords(results); return page; } private static <T> JSONObject getOffsetTime(Page<T> page, String startTime, String endTime, String entPoint, ElasticSearchQuery query) throws Exception { Long currentSize = page.getSize() * (page.getCurrent()-1); String newEndTime = ""; Long offsetHour = currentSize / 3600; newEndTime = getOffsetTime(endTime, -Integer.valueOf(String.valueOf(offsetHour))); while (true) { EsResponse leftTotal = mergerQueryTime(startTime, newEndTime, query, entPoint); Integer rightTotal = mergerQuery(newEndTime, endTime, query, entPoint); if (currentSize - rightTotal < 10000 && currentSize - rightTotal > 0) { JSONObject jsonObject = new JSONObject(); jsonObject.put("time", newEndTime); jsonObject.put("from", currentSize - rightTotal); return jsonObject; } //最近是空数据 if (rightTotal == 0 || currentSize - rightTotal > 0) { currentSize -= rightTotal; if (currentSize > Long.valueOf(leftTotal.getHits().getTotal())) { return null; } //重新偏移坐标,计算坐标 if (CollectionUtils.isEmpty(leftTotal.getDatas()) || Long.valueOf(leftTotal.getHits().getTotal()) <10000) { return null; } newEndTime = (String) ((JSONObject) leftTotal.getDatas().get(0)).get("logTime"); endTime = newEndTime; newEndTime = getOffsetTime(endTime, -Integer.valueOf(String.valueOf(offsetHour))); } else if (currentSize - rightTotal < 0) { double multiple = (double) rightTotal / currentSize; offsetHour = (long) (offsetHour / multiple); newEndTime = getOffsetTime(endTime, -Integer.valueOf(String.valueOf(offsetHour))); } } } private static EsResponse mergerQueryTime(String beginTime, String endTime, ElasticSearchQuery query, String entPoint) throws Exception { String method = "POST"; query.removeRange(); query.ranges("logTime", beginTime, endTime); logger.debug(query.getBody().toString()); Response response = performRequest(method, entPoint, query.getBody().toString()); // 获取response body,转换为json对象 JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); EsResponse esResponse = result.toJavaObject(EsResponse.class); return esResponse; } private static Integer mergerQuery(String beginTime, String endTime, ElasticSearchQuery query, String entPoint) throws Exception { String method = "POST"; query.removeRange(); query.ranges("logTime", beginTime, endTime); Response response = performRequest(method, entPoint, query.getBody().toString()); // 获取response body,转换为json对象 JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); EsResponse esResponse = result.toJavaObject(EsResponse.class); int total = Integer.valueOf(esResponse.getHits().getTotal()); return total; } /** * 获取偏移时间 * * @param beginTime * @param offsetHour * @return * @throws Exception */ private static String getOffsetTime(String beginTime, Integer offsetHour) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); if (StringUtils.isEmpty(beginTime)) { beginTime = format.format(new Date()); } //获取上一条最后的保存时间与当前时间进行相减,与保存间隔相比较 Calendar calendar = Calendar.getInstance(); calendar.setTime(format.parse(beginTime)); calendar.add(Calendar.HOUR_OF_DAY, offsetHour); return format.format(calendar.getTime()); } /** * 1w条以上的通过滑动窗口实现 * 太慢了,丢弃 * * @param index * @param type * @param query * @param <T> * @return * @throws IOException */ public static <T> Page<T> searchQueryScrollPage(Page<T> page, String index, String type, ElasticSearchQuery query) throws IOException { String method = "POST"; List<T> results = new ArrayList<>(); query.size(10000L); Long currentSize = page.getSize() * page.getCurrent() + page.getSize(); String entPoint = index.concat("/").concat(type).concat("/").concat("_search").concat("?scroll=1m"); logger.debug(query.getBody().toString()); Response response = performRequest(method, entPoint, query.getBody().toString()); JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); EsResponse esResponse = result.toJavaObject(EsResponse.class); page.setTotal(Long.valueOf(esResponse.getHits().getTotal())); if (currentSize >= page.getTotal()) { return page; } currentSize -= 10000; String scrollId = result.getString("_scroll_id"); entPoint = "_search/scroll"; JSONObject scrollBody = new JSONObject(); scrollBody.put("scroll", "1m"); scrollBody.put("scroll_id", scrollId); method = "GET"; Long tempSize = currentSize > 10000 ? 10000 : currentSize; while (esResponse.getDatas().size() > 0 && currentSize > 0) { query.size(tempSize); response = performRequest(method, entPoint, scrollBody.toString()); currentSize -= currentSize > 10000 ? 10000 : currentSize; } result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); esResponse = result.toJavaObject(EsResponse.class); if (ObjectUtil.isNotEmpty(esResponse.getDatas())) { results = esResponse.getDatas(); } // 查询完成后,及时删除scroll,释放资源 scrollBody.remove("scroll"); method = "DELETE"; performRequest(method, entPoint, scrollBody.toString()); logger.debug(query.getBody().toString()); Long beginIndex = (page.getSize() * page.getCurrent()) / 10000; results = results.subList(Integer.valueOf(String.valueOf(beginIndex)), Integer.valueOf(String.valueOf(beginIndex + page.getSize()))); page.setRecords(results); return page; } /** * 根据条件进行滚动查询 */ public static <T> List<T> searchQueryScroll(String index, String type, ElasticSearchQuery query) throws IOException { // 首次查询,提交查询条件,endpoint增加‘?scroll=1m’ List<T> results = new ArrayList<>(); String method = "POST"; String entPoint = index.concat("/").concat(type).concat("/").concat("_search").concat("?scroll=1m"); logger.debug(query.getBody().toString()); Response response = performRequest(method, entPoint, query.getBody().toString()); JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); EsResponse esResponse = result.toJavaObject(EsResponse.class); String scrollId = result.getString("_scroll_id"); if (ObjectUtil.isNotEmpty(esResponse.getDatas())) { results.addAll(esResponse.getDatas()); } // 循环发送scroll请求,直到返回结果为空 entPoint = "_search/scroll"; JSONObject scrollBody = new JSONObject(); scrollBody.put("scroll", "1m"); scrollBody.put("scroll_id", scrollId); while (esResponse.getDatas().size() > 0) { method = "GET"; response = performRequest(method, entPoint, scrollBody.toString()); result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); esResponse = result.toJavaObject(EsResponse.class); if (ObjectUtil.isNotEmpty(esResponse.getDatas())) { results.addAll(esResponse.getDatas()); } } // 查询完成后,及时删除scroll,释放资源 scrollBody.remove("scroll"); method = "DELETE"; performRequest(method, entPoint, scrollBody.toString()); return results; } /** * 批处理 */ public static Response bulk(String index, String type, String entity) throws IOException { String method = "POST"; StringBuilder endPoint = new StringBuilder(); if (ObjectUtil.isNotEmpty(index)) { endPoint.append(index).append("/"); } if (ObjectUtil.isNotEmpty(type)) { endPoint.append(type).append("/"); } endPoint.append("_bulk"); endPoint.append("?refresh=true"); logger.debug(entity); return performRequest(method, endPoint.toString(), entity); } /** * 请求操作通用类 */ private static Response performRequest(String method, String endpoint, String entity) throws IOException { //logger.info(method + " " + endpoint); Request request = new Request(method, endpoint); if (ObjectUtil.isNotEmpty(entity)) { request.setEntity(new NStringEntity(entity, "utf-8")); } if (ObjectUtil.isNotEmpty(username) && ObjectUtil.isNotEmpty(password)) { RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); String token = username.concat(":").concat(password); String base64encode = Base64.getEncoder().encodeToString(token.getBytes("utf-8")); builder.addHeader("Authorization", "Basic " + base64encode); request.setOptions(builder.build()); } Response response = client.performRequest(request); // logger.debug(response.toString()); return response; } }