Newer
Older
casic-PTZ / casic-common / src / main / java / com / casic / missiles / es / ElasticSearchUtil.java
chaizhuang on 22 Apr 2024 17 KB es
package com.casic.missiles.es;

import cn.hutool.core.stream.CollectorUtil;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.casic.missiles.dto.EsResponse;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.apache.poi.ss.formula.functions.T;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.LocalTime;
import java.time.temporal.ChronoUnit;
import java.util.*;

/**
 * ES工具类
 *
 * @author zhangyingjie123
 * @since 2020-11-30
 */
@Component
public class ElasticSearchUtil {
    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtil.class);

    @Autowired
    private RestClient clientMapping;
    @Autowired
    private Environment env;

    private static String username;
    private static String password;

    private static RestClient client;

    @PostConstruct
    public void init() {
        client = clientMapping;
        username = env.getProperty("casic.data.es.username");
        password = env.getProperty("casic.data.es.password");
    }

    /**
     * @param index
     * @param type
     * @param id
     * @return
     * @throws IOException
     */
    public static JSONObject selectDocumentById(String index, String type, String id) throws IOException {
        String method = "GET";
        String endPoint = index.concat("/").concat(type).concat("/").concat(id);
        Response response = performRequest(method, endPoint, null);
        JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
        return result.getJSONObject("_source");
    }

    /**
     * 根据id新增
     */
    public static Response addDocumentById(String index, String type, String id, Object entity) throws IOException {
        String method = "POST";
        String endPoint = index.concat("/").concat(type).concat("/");
        if (ObjectUtil.isNotEmpty(id)) {
            endPoint.concat(id);
        }
        endPoint.concat("?refresh=true");
//        logger.debug(entity.toString());
        return performRequest(method, endPoint, JSON.toJSONString(entity));
    }

    /**
     * 根据id删除
     */
    public static Response deleteDocumentById(String index, String type, String id) throws IOException {
        String method = "DELETE";
        String endPoint = index.concat("/").concat(type).concat("/").concat(id).concat("?refresh=true");
        return performRequest(method, endPoint, null);
    }

    /**
     * 根据id更新
     */
    public static Response updateDocumentById(String index, String type, String id, Object entity) throws IOException {
        String method = "POST";
        String endPoint = index.concat("/").concat(type).concat("/").concat(id).concat("?refresh=true");
        logger.debug(entity.toString());
        return performRequest(method, endPoint, JSON.toJSONString(entity));
    }

    /**
     * 根据条件查询
     */
    public static List<T> searchQuery(String index, String type, ElasticSearchQuery query) throws IOException {
        String method = "POST";
        String entPoint = index.concat("/").concat(type).concat("/").concat("_search");
        logger.debug(query.getBody().toString());
        Response response = performRequest(method, entPoint, query.getBody().toString());
        // 获取response body,转换为json对象
        JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
        EsResponse esResponse = result.toJavaObject(EsResponse.class);
        List<T> results = esResponse.getDatas();
        return results;
    }

    /**
     * 只支持1万条以下的下标查询
     * 根据条件查询返回分页
     */
    public static <T> Page<T> searchQueryPage(Page<T> page, String index, String type, ElasticSearchQuery query) throws IOException {
        String method = "POST";
        String entPoint = index.concat("/").concat(type).concat("/").concat("_search");
        logger.debug(query.getBody().toString());
        Response response = performRequest(method, entPoint, query.getBody().toString());
        // 获取response body,转换为json对象
        JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
        EsResponse esResponse = result.toJavaObject(EsResponse.class);
        List<T> results = esResponse.getDatas();
        page.setRecords(results);
        page.setTotal(Long.valueOf(esResponse.getHits().getTotal()));
        return page;
    }

    /**
     * 1、根据条件查询返回分页,小于1w条最直接返回
     * 2、超过1w条,通过回滚框分情况讨论
     */
    public static <T> Page<T> searchScrollQueryPage(Page<T> page, String index, String type, String startTime, String endTime, ElasticSearchQuery query) throws Exception {
        Long currentSize =(page.getCurrent()-1) * page.getSize();
        if (currentSize < 10000) {
            query.range("logTime", startTime, endTime);
            query.from(Integer.valueOf(String.valueOf(currentSize)));
            return searchQueryPage(page, index, type, query);
        } else {
            return budgetQueryPage(page, index, type, startTime, endTime, query);
        }
    }

    /**
     * 1、通过总条数/3600计算小时数
     * 2、获取第一条数据的时间差,然后再次进行计算
     * 3、预算查询,直到获取10000条内的数据,这样效率会远高于窗口滚动查询
     */
    private static <T> Page<T> budgetQueryPage(Page<T> page, String index, String type, String startTime, String endTime, ElasticSearchQuery query) throws Exception {
        String method = "POST";
        String entPoint = index.concat("/").concat(type).concat("/").concat("_search");
        query.range("logTime", startTime, endTime);
        Response response = performRequest(method, entPoint, query.getBody().toString());
        JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
        EsResponse esResponse = result.toJavaObject(EsResponse.class);
        page.setTotal(Long.valueOf(esResponse.getHits().getTotal()));
        JSONObject jsonObject = getOffsetTime(page, startTime, endTime, entPoint, query);
        if (ObjectUtils.isEmpty(jsonObject)) {
            return page;
        }
        query.removeRange();
        query.ranges("logTime", startTime, (String) jsonObject.get("time"));
        query.from(Integer.valueOf(String.valueOf(jsonObject.get("from"))));
        response = performRequest(method, entPoint, query.getBody().toString());
        result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
        esResponse = result.toJavaObject(EsResponse.class);
        List<T> results = esResponse.getDatas();
        page.setRecords(results);
        return page;
    }

    private static <T> JSONObject getOffsetTime(Page<T> page, String startTime, String endTime, String entPoint, ElasticSearchQuery query) throws Exception {
        Long currentSize = page.getSize() * (page.getCurrent()-1);
        String newEndTime = "";
        Long offsetHour = currentSize / 3600;
        newEndTime = getOffsetTime(endTime, -Integer.valueOf(String.valueOf(offsetHour)));
        while (true) {
            EsResponse leftTotal = mergerQueryTime(startTime, newEndTime, query, entPoint);
            Integer rightTotal = mergerQuery(newEndTime, endTime, query, entPoint);
            if (currentSize - rightTotal < 10000 && currentSize - rightTotal > 0) {
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("time", newEndTime);
                jsonObject.put("from", currentSize - rightTotal);
                return jsonObject;
            }
            //最近是空数据
            if (rightTotal == 0 || currentSize - rightTotal > 0) {
                currentSize -= rightTotal;
                if (currentSize > Long.valueOf(leftTotal.getHits().getTotal())) {
                    return null;
                }
                //重新偏移坐标,计算坐标
                if (CollectionUtils.isEmpty(leftTotal.getDatas()) || Long.valueOf(leftTotal.getHits().getTotal()) <10000) {
                    return null;
                }
                newEndTime = (String) ((JSONObject) leftTotal.getDatas().get(0)).get("logTime");
                endTime = newEndTime;
                newEndTime = getOffsetTime(endTime, -Integer.valueOf(String.valueOf(offsetHour)));
            } else if (currentSize - rightTotal < 0) {
                double multiple = (double) rightTotal / currentSize;
                offsetHour = (long) (offsetHour / multiple);
                newEndTime = getOffsetTime(endTime, -Integer.valueOf(String.valueOf(offsetHour)));
            }
        }
    }


    private static EsResponse mergerQueryTime(String beginTime, String endTime, ElasticSearchQuery query, String entPoint) throws Exception {
        String method = "POST";
        query.removeRange();
        query.ranges("logTime", beginTime, endTime);
        logger.debug(query.getBody().toString());
        Response response = performRequest(method, entPoint, query.getBody().toString());
        // 获取response body,转换为json对象
        JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
        EsResponse esResponse = result.toJavaObject(EsResponse.class);
        return esResponse;
    }

    private static Integer mergerQuery(String beginTime, String endTime, ElasticSearchQuery query, String entPoint) throws Exception {
        String method = "POST";
        query.removeRange();
        query.ranges("logTime", beginTime, endTime);
        Response response = performRequest(method, entPoint, query.getBody().toString());
        // 获取response body,转换为json对象
        JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
        EsResponse esResponse = result.toJavaObject(EsResponse.class);
        int total = Integer.valueOf(esResponse.getHits().getTotal());
        return total;
    }

    /**
     * 获取偏移时间
     *
     * @param beginTime
     * @param offsetHour
     * @return
     * @throws Exception
     */
    private static String getOffsetTime(String beginTime, Integer offsetHour) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        if (StringUtils.isEmpty(beginTime)) {
            beginTime = format.format(new Date());
        }
        //获取上一条最后的保存时间与当前时间进行相减,与保存间隔相比较
        Calendar calendar = Calendar.getInstance();
        calendar.setTime(format.parse(beginTime));
        calendar.add(Calendar.HOUR_OF_DAY, offsetHour);
        return format.format(calendar.getTime());
    }

    /**
     * 1w条以上的通过滑动窗口实现
     * 太慢了,丢弃
     *
     * @param index
     * @param type
     * @param query
     * @param <T>
     * @return
     * @throws IOException
     */
    public static <T> Page<T> searchQueryScrollPage(Page<T> page, String index, String type, ElasticSearchQuery query) throws
            IOException {
        String method = "POST";
        List<T> results = new ArrayList<>();
        query.size(10000L);
        Long currentSize = page.getSize() * page.getCurrent() + page.getSize();
        String entPoint = index.concat("/").concat(type).concat("/").concat("_search").concat("?scroll=1m");
        logger.debug(query.getBody().toString());
        Response response = performRequest(method, entPoint, query.getBody().toString());
        JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
        EsResponse esResponse = result.toJavaObject(EsResponse.class);
        page.setTotal(Long.valueOf(esResponse.getHits().getTotal()));
        if (currentSize >= page.getTotal()) {
            return page;
        }
        currentSize -= 10000;
        String scrollId = result.getString("_scroll_id");
        entPoint = "_search/scroll";
        JSONObject scrollBody = new JSONObject();
        scrollBody.put("scroll", "1m");
        scrollBody.put("scroll_id", scrollId);
        method = "GET";
        Long tempSize = currentSize > 10000 ? 10000 : currentSize;
        while (esResponse.getDatas().size() > 0 && currentSize > 0) {
            query.size(tempSize);
            response = performRequest(method, entPoint, scrollBody.toString());
            currentSize -= currentSize > 10000 ? 10000 : currentSize;
        }
        result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
        esResponse = result.toJavaObject(EsResponse.class);
        if (ObjectUtil.isNotEmpty(esResponse.getDatas())) {
            results = esResponse.getDatas();
        }
        // 查询完成后,及时删除scroll,释放资源
        scrollBody.remove("scroll");
        method = "DELETE";
        performRequest(method, entPoint, scrollBody.toString());
        logger.debug(query.getBody().toString());
        Long beginIndex = (page.getSize() * page.getCurrent()) / 10000;
        results = results.subList(Integer.valueOf(String.valueOf(beginIndex)), Integer.valueOf(String.valueOf(beginIndex + page.getSize())));
        page.setRecords(results);
        return page;
    }

    /**
     * 根据条件进行滚动查询
     */
    public static <T> List<T> searchQueryScroll(String index, String type, ElasticSearchQuery query) throws
            IOException {
        // 首次查询,提交查询条件,endpoint增加‘?scroll=1m’
        List<T> results = new ArrayList<>();
        String method = "POST";
        String entPoint = index.concat("/").concat(type).concat("/").concat("_search").concat("?scroll=1m");
        logger.debug(query.getBody().toString());
        Response response = performRequest(method, entPoint, query.getBody().toString());
        JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
        EsResponse esResponse = result.toJavaObject(EsResponse.class);
        String scrollId = result.getString("_scroll_id");
        if (ObjectUtil.isNotEmpty(esResponse.getDatas())) {
            results.addAll(esResponse.getDatas());
        }
        // 循环发送scroll请求,直到返回结果为空
        entPoint = "_search/scroll";
        JSONObject scrollBody = new JSONObject();
        scrollBody.put("scroll", "1m");
        scrollBody.put("scroll_id", scrollId);
        while (esResponse.getDatas().size() > 0) {
            method = "GET";
            response = performRequest(method, entPoint, scrollBody.toString());
            result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
            esResponse = result.toJavaObject(EsResponse.class);
            if (ObjectUtil.isNotEmpty(esResponse.getDatas())) {
                results.addAll(esResponse.getDatas());
            }
        }
        // 查询完成后,及时删除scroll,释放资源
        scrollBody.remove("scroll");
        method = "DELETE";
        performRequest(method, entPoint, scrollBody.toString());
        return results;
    }


    /**
     * 批处理
     */
    public static Response bulk(String index, String type, String entity) throws IOException {
        String method = "POST";
        StringBuilder endPoint = new StringBuilder();
        if (ObjectUtil.isNotEmpty(index)) {
            endPoint.append(index).append("/");
        }
        if (ObjectUtil.isNotEmpty(type)) {
            endPoint.append(type).append("/");
        }
        endPoint.append("_bulk");
        endPoint.append("?refresh=true");
        logger.debug(entity);
        return performRequest(method, endPoint.toString(), entity);
    }

    /**
     * 请求操作通用类
     */
    private static Response performRequest(String method, String endpoint, String entity) throws IOException {
        //logger.info(method + " " + endpoint);
        Request request = new Request(method, endpoint);
        if (ObjectUtil.isNotEmpty(entity)) {
            request.setEntity(new NStringEntity(entity, "utf-8"));
        }
        if (ObjectUtil.isNotEmpty(username) && ObjectUtil.isNotEmpty(password)) {
            RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
            String token = username.concat(":").concat(password);
            String base64encode = Base64.getEncoder().encodeToString(token.getBytes("utf-8"));
            builder.addHeader("Authorization", "Basic " + base64encode);
            request.setOptions(builder.build());
        }
        Response response = client.performRequest(request);
//        logger.debug(response.toString());
        return response;
    }

}