Newer
Older
casic-PTZ / casic-common / src / main / java / com / casic / missiles / es / ElasticSearchUtil.java
casic_zt on 2 Feb 2024 8 KB 数据推送优化
package com.casic.missiles.es;

import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.casic.missiles.dto.EsResponse;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.apache.poi.ss.formula.functions.T;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;

/**
 * ES工具类
 *
 * @author zhangyingjie123
 * @since 2020-11-30
 */
@Component
public class ElasticSearchUtil {
    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtil.class);

    @Autowired
    private RestClient clientMapping;
    @Autowired
    private Environment env;

    private static String username;
    private static String password;

    private static RestClient client;

    @PostConstruct
    public void init() {
        client = clientMapping;
        username = env.getProperty("casic.data.es.username");
        password = env.getProperty("casic.data.es.password");
    }

    /**
     * @param index
     * @param type
     * @param id
     * @return
     * @throws IOException
     */
    public static JSONObject selectDocumentById(String index, String type, String id) throws IOException {
        String method = "GET";
        String endPoint = index.concat("/").concat(type).concat("/").concat(id);
        Response response = performRequest(method, endPoint, null);
        JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
        return result.getJSONObject("_source");
    }

    /**
     * 根据id新增
     */
    public static Response addDocumentById(String index, String type, String id, Object entity) throws IOException {
        String method = "POST";
        String endPoint = index.concat("/").concat(type).concat("/");
        if (ObjectUtil.isNotEmpty(id)) {
            endPoint.concat(id);
        }
        endPoint.concat("?refresh=true");
//        logger.debug(entity.toString());
        return performRequest(method, endPoint, JSON.toJSONString(entity));
    }

    /**
     * 根据id删除
     */
    public static Response deleteDocumentById(String index, String type, String id) throws IOException {
        String method = "DELETE";
        String endPoint = index.concat("/").concat(type).concat("/").concat(id).concat("?refresh=true");
        return performRequest(method, endPoint, null);
    }

    /**
     * 根据id更新
     */
    public static Response updateDocumentById(String index, String type, String id, Object entity) throws IOException {
        String method = "POST";
        String endPoint = index.concat("/").concat(type).concat("/").concat(id).concat("?refresh=true");
        logger.debug(entity.toString());
        return performRequest(method, endPoint, JSON.toJSONString(entity));
    }

    /**
     * 根据条件查询
     */
    public static List<T> searchQuery(String index, String type, ElasticSearchQuery query) throws IOException {
        String method = "POST";
        String entPoint = index.concat("/").concat(type).concat("/").concat("_search");
        logger.debug(query.getBody().toString());
        Response response = performRequest(method, entPoint, query.getBody().toString());
        // 获取response body,转换为json对象
        JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
        EsResponse esResponse = result.toJavaObject(EsResponse.class);
        List<T> results = esResponse.getDatas();
        return results;
    }

    /**
     * 根据条件查询返回分页
     */
    public  static<T> Page<T>  searchQueryPage(Page<T> page, String index, String type, ElasticSearchQuery query) throws IOException {
        String method = "POST";
        String entPoint = index.concat("/").concat(type).concat("/").concat("_search");
        logger.debug(query.getBody().toString());
        Response response = performRequest(method, entPoint, query.getBody().toString());
        // 获取response body,转换为json对象
        JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
        EsResponse esResponse = result.toJavaObject(EsResponse.class);
        List<T> results = esResponse.getDatas();
        page.setRecords(results);
        page.setTotal(Long.valueOf(esResponse.getHits().getTotal()));
        return page;
    }

    /**
     * 根据条件进行滚动查询
     */
    public static<T>  List<T> searchQueryScroll(String index, String type, ElasticSearchQuery query) throws IOException {
        // 首次查询,提交查询条件,endpoint增加‘?scroll=1m’
        List<T> results = new ArrayList<>();
        String method = "POST";
        String entPoint = index.concat("/").concat(type).concat("/").concat("_search").concat("?scroll=1m");
        logger.debug(query.getBody().toString());
        Response response = performRequest(method, entPoint, query.getBody().toString());
        JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
        EsResponse esResponse = result.toJavaObject(EsResponse.class);
        String scrollId = result.getString("_scroll_id");
        if (ObjectUtil.isNotEmpty(esResponse.getDatas())) {
            results.addAll(esResponse.getDatas());
        }
        // 循环发送scroll请求,直到返回结果为空
        entPoint = "_search/scroll";
        JSONObject scrollBody = new JSONObject();
        scrollBody.put("scroll", "1m");
        scrollBody.put("scroll_id", scrollId);
        while (esResponse.getDatas().size() > 0) {
            method = "GET";
            response = performRequest(method, entPoint, scrollBody.toString());
            result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
            esResponse = result.toJavaObject(EsResponse.class);
            if (ObjectUtil.isNotEmpty(esResponse.getDatas())) {
                results.addAll(esResponse.getDatas());
            }
        }
        // 查询完成后,及时删除scroll,释放资源
        scrollBody.remove("scroll");
        method = "DELETE";
        performRequest(method, entPoint, scrollBody.toString());
        return results;
    }


    /**
     * 批处理
     */
    public static Response bulk(String index, String type, String entity) throws IOException {
        String method = "POST";
        StringBuilder endPoint = new StringBuilder();
        if (ObjectUtil.isNotEmpty(index)) {
            endPoint.append(index).append("/");
        }
        if (ObjectUtil.isNotEmpty(type)) {
            endPoint.append(type).append("/");
        }
        endPoint.append("_bulk");
        endPoint.append("?refresh=true");
        logger.debug(entity);
        return performRequest(method, endPoint.toString(), entity);
    }

    /**
     * 请求操作通用类
     */
    private static Response performRequest(String method, String endpoint, String entity) throws IOException {
        //logger.info(method + " " + endpoint);
        Request request = new Request(method, endpoint);
        if (ObjectUtil.isNotEmpty(entity)) {
            request.setEntity(new NStringEntity(entity, "utf-8"));
        }

        if (ObjectUtil.isNotEmpty(username) && ObjectUtil.isNotEmpty(password)) {
            RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
            String token = username.concat(":").concat(password);
            String base64encode = Base64.getEncoder().encodeToString(token.getBytes("utf-8"));
            builder.addHeader("Authorization", "Basic " + base64encode);
            request.setOptions(builder.build());
        }
        Response response = client.performRequest(request);
//        logger.debug(response.toString());
        return response;
    }

}