Newer
Older
casic-PTZ / casic-common / src / main / java / com / casic / missiles / es / ElasticSearchUtil.java
casic_zt on 21 Apr 2023 7 KB es提交
package com.casic.missiles.es;

import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.casic.missiles.core.es.EsResponse;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;

/**
 * ES工具类
 *
 * @author zhangyingjie123
 * @since 2020-11-30
 */
@Component
public class ElasticSearchUtil {
    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtil.class);

    @Autowired
    private RestClient clientMapping;
    @Autowired
    private Environment env;

    private static String username;
    private static String password;

    private static RestClient client;

    @PostConstruct
    public void init(){
        client = clientMapping;
        username = env.getProperty("casic.data.es.username");
        password = env.getProperty("casic.data.es.password");
    }

    public static JSONObject selectDocumentById(String index, String type, String id) throws IOException{
        String method = "GET";
        String endPoint = index.concat("/").concat(type).concat("/").concat(id);
        Response response = performRequest(method,endPoint,null);
        JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
        return result.getJSONObject("_source");
    }

    public static Response addDocumentById(String index, String type, String id, Object entity) throws IOException{
        String method = "POST";
        String endPoint = index.concat("/").concat(type).concat("/");
        if(ObjectUtil.isNotEmpty(id)){
            endPoint.concat(id);
        }
        endPoint.concat("?refresh=true");
        logger.debug(entity.toString());
        return performRequest(method,endPoint,JSON.toJSONString(entity));
    }

    public static Response deleteDocumentById(String index, String type, String id) throws IOException{
        String method = "DELETE";
        String endPoint = index.concat("/").concat(type).concat("/").concat(id).concat("?refresh=true");
        return performRequest(method,endPoint,null);
    }

    public static Response updateDocumentById(String index, String type, String id, Object entity) throws IOException{
        String method = "POST";
        String endPoint = index.concat("/").concat(type).concat("/").concat(id).concat("?refresh=true");
        logger.debug(entity.toString());
        return performRequest(method,endPoint,JSON.toJSONString(entity));
    }

    public static EsResponse searchQuery(String index, String type, ElasticSearchQuery query) throws IOException{
        String method = "POST";
        String entPoint = index.concat("/").concat(type).concat("/").concat("_search");
        logger.debug(query.getBody().toString());
        Response response = performRequest(method, entPoint, query.getBody().toString());
        // 获取response body,转换为json对象
        JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
        return result.toJavaObject(EsResponse.class);
    }

    public static Response aggsQuery(String index, String type, ElasticSearchQuery query) throws IOException{
        String method = "POST";
        String entPoint = index.concat("/").concat(type).concat("/").concat("_search");
        logger.debug(query.getBody().toString());
        Response response = performRequest(method, entPoint, query.getBody().toString());
        return response;
    }

    public static List<Object> searchQueryScroll(String index, String type, ElasticSearchQuery query) throws IOException{
        // 首次查询,提交查询条件,endpoint增加‘?scroll=1m’
        List<Object> results = new ArrayList<>();
        String method = "POST";
        String entPoint = index.concat("/").concat(type).concat("/").concat("_search").concat("?scroll=1m");
        logger.debug(query.getBody().toString());
        Response response = performRequest(method, entPoint, query.getBody().toString());
        JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
        EsResponse esResponse = result.toJavaObject(EsResponse.class);
        String scrollId = result.getString("_scroll_id");
        if(ObjectUtil.isNotEmpty(esResponse.getDatas())){
            results.addAll(esResponse.getDatas());
        }
        // 循环发送scroll请求,直到返回结果为空
        entPoint = "_search/scroll";
        JSONObject scrollBody = new JSONObject();
        scrollBody.put("scroll","1m");
        scrollBody.put("scroll_id",scrollId);
        while (esResponse.getDatas().size() > 0){
            method = "GET";
            response = performRequest(method, entPoint, scrollBody.toString());
            result = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
            esResponse = result.toJavaObject(EsResponse.class);
            if(ObjectUtil.isNotEmpty(esResponse.getDatas())){
                results.addAll(esResponse.getDatas());
            }
        }
        // 查询完成后,及时删除scroll,释放资源
        scrollBody.remove("scroll");
        method = "DELETE";
        performRequest(method, entPoint, scrollBody.toString());
        return results;
    }

    public static Response bulk(String index,String type,String entity) throws IOException{
        String method = "POST";
        StringBuilder endPoint = new StringBuilder();
        if(ObjectUtil.isNotEmpty(index)){
            endPoint.append(index).append("/");
        }
        if(ObjectUtil.isNotEmpty(type)){
            endPoint.append(type).append("/");
        }
        endPoint.append("_bulk");
        endPoint.append("?refresh=true");
        logger.debug(entity);
        return performRequest(method,endPoint.toString(),entity);
    }

    private static Response performRequest(String method, String endpoint, String entity) throws IOException {
        //logger.info(method + " " + endpoint);
        Request request = new Request(method,endpoint);
        if(ObjectUtil.isNotEmpty(entity)){
            request.setEntity(new NStringEntity(entity,"utf-8"));
        }

        if(ObjectUtil.isNotEmpty(username) && ObjectUtil.isNotEmpty(password)){
            RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
            String token = username.concat(":").concat(password);
            String base64encode = Base64.getEncoder().encodeToString(token.getBytes("utf-8"));
            builder.addHeader("Authorization", "Basic " + base64encode );
            request.setOptions(builder.build());
        }


        Response response = client.performRequest(request);
        logger.debug(response.toString());
        return response;
    }

}