diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java index d9a35d5..5dbe030 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java @@ -281,12 +281,15 @@ } } - public void size(int size) { + public void size(Long size) { this.body.put("size", size); } public void from(int from) { this.body.put("from", from); + if(from>10000){ + this.body.put("query", "match_all"); + } } public void sort(String sort, String order) { @@ -324,7 +327,7 @@ Date date2 = DateUtil.parse("2020-04-01", "yyyy-MM-dd"); query.range("collTime", date1.getTime(), date2.getTime()); query.term("collState", "3", MUST_NOT_FLAG); - query.size(10); + query.size(10L); query.from(0); query.sort("irId", "asc"); System.out.println(query.getBody().toJSONString()); diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java index d9a35d5..5dbe030 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java @@ -281,12 +281,15 @@ } } - public void size(int size) { + public void size(Long size) { this.body.put("size", size); } public void from(int from) { this.body.put("from", from); + if(from>10000){ + this.body.put("query", "match_all"); + } } public void sort(String sort, String order) { @@ -324,7 +327,7 @@ Date date2 = DateUtil.parse("2020-04-01", "yyyy-MM-dd"); query.range("collTime", date1.getTime(), date2.getTime()); query.term("collState", "3", MUST_NOT_FLAG); - query.size(10); + query.size(10L); query.from(0); query.sort("irId", "asc"); System.out.println(query.getBody().toJSONString()); diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java index cf1ac24..4bbd046 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java @@ -3,6 +3,7 @@ import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; +import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.casic.missiles.dto.EsResponse; import org.apache.http.nio.entity.NStringEntity; @@ -115,9 +116,10 @@ } /** + * 只支持1万条以下的下标查询 * 根据条件查询返回分页 */ - public static Page searchQueryPage(Page page, String index, String type, ElasticSearchQuery query) throws IOException { + public static Page searchQueryPage(Page page, String index, String type, ElasticSearchQuery query) throws IOException { String method = "POST"; String entPoint = index.concat("/").concat(type).concat("/").concat("_search"); logger.debug(query.getBody().toString()); @@ -132,9 +134,69 @@ } /** + * 1、根据条件查询返回分页,小于1w条最直接返回 + * 2、超过1w条,通过回滚框分情况讨论 + */ + public static Page searchScrollQueryPage(Page page, String index, String type, ElasticSearchQuery query) throws IOException { + Long currentSize = page.getSize() * page.getCurrent(); + if (currentSize < 10000) { + return searchQueryPage(page, index, type, query); + } else { + query.size(10000L); + return searchQueryScrollPage(page, index, type, query); + } + } + + /** + * 1w条以上的通过滑动窗口实现 + * 1、 + * + * @param index + * @param type + * @param query + * @param + * @return + * @throws IOException + */ + public static Page searchQueryScrollPage(Page page, String index, String type, ElasticSearchQuery query) throws IOException { + String method = "POST"; + List results = new ArrayList<>(); + Long currentSize = page.getSize() * page.getCurrent(); + String entPoint = index.concat("/").concat(type).concat("/").concat("_search").concat("?scroll=1m"); + logger.debug(query.getBody().toString()); + Response response = performRequest(method, entPoint, query.getBody().toString()); + JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + EsResponse esResponse = result.toJavaObject(EsResponse.class); + page.setTotal(Long.valueOf(esResponse.getHits().getTotal())); + String scrollId = result.getString("_scroll_id"); + entPoint = "_search/scroll"; + JSONObject scrollBody = new JSONObject(); + scrollBody.put("scroll", "1m"); + scrollBody.put("scroll_id", scrollId); + while (esResponse.getDatas().size() > 0 && currentSize > 10000) { + method = "GET"; + response = performRequest(method, entPoint, scrollBody.toString()); + result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + esResponse = result.toJavaObject(EsResponse.class); + if (ObjectUtil.isNotEmpty(esResponse.getDatas())) { + results = esResponse.getDatas(); + } + currentSize -= 10000; + } + // 查询完成后,及时删除scroll,释放资源 + scrollBody.remove("scroll"); + method = "DELETE"; + performRequest(method, entPoint, scrollBody.toString()); + logger.debug(query.getBody().toString()); + results = results.subList(Integer.valueOf(String.valueOf(currentSize)), Integer.valueOf(String.valueOf(currentSize + page.getSize()))); + page.setRecords(results); + return page; + } + + /** * 根据条件进行滚动查询 */ - public static List searchQueryScroll(String index, String type, ElasticSearchQuery query) throws IOException { + public static List searchQueryScroll(String index, String type, ElasticSearchQuery query) throws IOException { // 首次查询,提交查询条件,endpoint增加‘?scroll=1m’ List results = new ArrayList<>(); String method = "POST"; diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java index d9a35d5..5dbe030 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java @@ -281,12 +281,15 @@ } } - public void size(int size) { + public void size(Long size) { this.body.put("size", size); } public void from(int from) { this.body.put("from", from); + if(from>10000){ + this.body.put("query", "match_all"); + } } public void sort(String sort, String order) { @@ -324,7 +327,7 @@ Date date2 = DateUtil.parse("2020-04-01", "yyyy-MM-dd"); query.range("collTime", date1.getTime(), date2.getTime()); query.term("collState", "3", MUST_NOT_FLAG); - query.size(10); + query.size(10L); query.from(0); query.sort("irId", "asc"); System.out.println(query.getBody().toJSONString()); diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java index cf1ac24..4bbd046 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java @@ -3,6 +3,7 @@ import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; +import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.casic.missiles.dto.EsResponse; import org.apache.http.nio.entity.NStringEntity; @@ -115,9 +116,10 @@ } /** + * 只支持1万条以下的下标查询 * 根据条件查询返回分页 */ - public static Page searchQueryPage(Page page, String index, String type, ElasticSearchQuery query) throws IOException { + public static Page searchQueryPage(Page page, String index, String type, ElasticSearchQuery query) throws IOException { String method = "POST"; String entPoint = index.concat("/").concat(type).concat("/").concat("_search"); logger.debug(query.getBody().toString()); @@ -132,9 +134,69 @@ } /** + * 1、根据条件查询返回分页,小于1w条最直接返回 + * 2、超过1w条,通过回滚框分情况讨论 + */ + public static Page searchScrollQueryPage(Page page, String index, String type, ElasticSearchQuery query) throws IOException { + Long currentSize = page.getSize() * page.getCurrent(); + if (currentSize < 10000) { + return searchQueryPage(page, index, type, query); + } else { + query.size(10000L); + return searchQueryScrollPage(page, index, type, query); + } + } + + /** + * 1w条以上的通过滑动窗口实现 + * 1、 + * + * @param index + * @param type + * @param query + * @param + * @return + * @throws IOException + */ + public static Page searchQueryScrollPage(Page page, String index, String type, ElasticSearchQuery query) throws IOException { + String method = "POST"; + List results = new ArrayList<>(); + Long currentSize = page.getSize() * page.getCurrent(); + String entPoint = index.concat("/").concat(type).concat("/").concat("_search").concat("?scroll=1m"); + logger.debug(query.getBody().toString()); + Response response = performRequest(method, entPoint, query.getBody().toString()); + JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + EsResponse esResponse = result.toJavaObject(EsResponse.class); + page.setTotal(Long.valueOf(esResponse.getHits().getTotal())); + String scrollId = result.getString("_scroll_id"); + entPoint = "_search/scroll"; + JSONObject scrollBody = new JSONObject(); + scrollBody.put("scroll", "1m"); + scrollBody.put("scroll_id", scrollId); + while (esResponse.getDatas().size() > 0 && currentSize > 10000) { + method = "GET"; + response = performRequest(method, entPoint, scrollBody.toString()); + result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + esResponse = result.toJavaObject(EsResponse.class); + if (ObjectUtil.isNotEmpty(esResponse.getDatas())) { + results = esResponse.getDatas(); + } + currentSize -= 10000; + } + // 查询完成后,及时删除scroll,释放资源 + scrollBody.remove("scroll"); + method = "DELETE"; + performRequest(method, entPoint, scrollBody.toString()); + logger.debug(query.getBody().toString()); + results = results.subList(Integer.valueOf(String.valueOf(currentSize)), Integer.valueOf(String.valueOf(currentSize + page.getSize()))); + page.setRecords(results); + return page; + } + + /** * 根据条件进行滚动查询 */ - public static List searchQueryScroll(String index, String type, ElasticSearchQuery query) throws IOException { + public static List searchQueryScroll(String index, String type, ElasticSearchQuery query) throws IOException { // 首次查询,提交查询条件,endpoint增加‘?scroll=1m’ List results = new ArrayList<>(); String method = "POST"; diff --git a/casic-data/src/main/java/com/casic/missiles/modular/system/service/impl/BusPatrolLogServiceImpl.java b/casic-data/src/main/java/com/casic/missiles/modular/system/service/impl/BusPatrolLogServiceImpl.java index f408792..a251a90 100644 --- a/casic-data/src/main/java/com/casic/missiles/modular/system/service/impl/BusPatrolLogServiceImpl.java +++ b/casic-data/src/main/java/com/casic/missiles/modular/system/service/impl/BusPatrolLogServiceImpl.java @@ -54,9 +54,10 @@ @Override public Page patrolLogListPage(Page page, Long monitorId, String startTime, String endTime) throws IOException { ElasticSearchQuery elasticSearchQuery = initParam(monitorId, startTime, endTime); - elasticSearchQuery.size(Integer.valueOf(String.valueOf(page.getSize()))); - elasticSearchQuery.from(Integer.valueOf(String.valueOf(page.getCurrent()*page.getSize()))); - page = ElasticSearchUtil.searchQueryPage(page, DataConst.DATA_GAS_ES_INDEX, DataConst.DATA_GAS_ES_TYPE, elasticSearchQuery); + elasticSearchQuery.size(page.getSize()); +// elasticSearchQuery.from(Integer.valueOf(String.valueOf(page.getCurrent()*page.getSize()))); + ElasticSearchUtil.searchScrollQueryPage(page,DataConst.DATA_GAS_ES_INDEX, DataConst.DATA_GAS_ES_TYPE, elasticSearchQuery); +// searchQueryPage(page, DataConst.DATA_GAS_ES_INDEX, DataConst.DATA_GAS_ES_TYPE, elasticSearchQuery); return page; } @@ -68,7 +69,7 @@ page.setSize(10000); try { ElasticSearchQuery elasticSearchQuery = initParam(busPatrolLogDTO.getMonitorId(), busPatrolLogDTO.getStartTime(), busPatrolLogDTO.getEndTime()); - elasticSearchQuery.size(1000); + elasticSearchQuery.size(1000L); List responseList = ElasticSearchUtil.searchQueryScroll(DataConst.DATA_GAS_ES_INDEX, DataConst.DATA_GAS_ES_TYPE, elasticSearchQuery); List expResponseList = new ArrayList<>(); responseList.forEach(busPatrolLogResponse -> { diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java index d9a35d5..5dbe030 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java @@ -281,12 +281,15 @@ } } - public void size(int size) { + public void size(Long size) { this.body.put("size", size); } public void from(int from) { this.body.put("from", from); + if(from>10000){ + this.body.put("query", "match_all"); + } } public void sort(String sort, String order) { @@ -324,7 +327,7 @@ Date date2 = DateUtil.parse("2020-04-01", "yyyy-MM-dd"); query.range("collTime", date1.getTime(), date2.getTime()); query.term("collState", "3", MUST_NOT_FLAG); - query.size(10); + query.size(10L); query.from(0); query.sort("irId", "asc"); System.out.println(query.getBody().toJSONString()); diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java index cf1ac24..4bbd046 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java @@ -3,6 +3,7 @@ import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; +import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.casic.missiles.dto.EsResponse; import org.apache.http.nio.entity.NStringEntity; @@ -115,9 +116,10 @@ } /** + * 只支持1万条以下的下标查询 * 根据条件查询返回分页 */ - public static Page searchQueryPage(Page page, String index, String type, ElasticSearchQuery query) throws IOException { + public static Page searchQueryPage(Page page, String index, String type, ElasticSearchQuery query) throws IOException { String method = "POST"; String entPoint = index.concat("/").concat(type).concat("/").concat("_search"); logger.debug(query.getBody().toString()); @@ -132,9 +134,69 @@ } /** + * 1、根据条件查询返回分页,小于1w条最直接返回 + * 2、超过1w条,通过回滚框分情况讨论 + */ + public static Page searchScrollQueryPage(Page page, String index, String type, ElasticSearchQuery query) throws IOException { + Long currentSize = page.getSize() * page.getCurrent(); + if (currentSize < 10000) { + return searchQueryPage(page, index, type, query); + } else { + query.size(10000L); + return searchQueryScrollPage(page, index, type, query); + } + } + + /** + * 1w条以上的通过滑动窗口实现 + * 1、 + * + * @param index + * @param type + * @param query + * @param + * @return + * @throws IOException + */ + public static Page searchQueryScrollPage(Page page, String index, String type, ElasticSearchQuery query) throws IOException { + String method = "POST"; + List results = new ArrayList<>(); + Long currentSize = page.getSize() * page.getCurrent(); + String entPoint = index.concat("/").concat(type).concat("/").concat("_search").concat("?scroll=1m"); + logger.debug(query.getBody().toString()); + Response response = performRequest(method, entPoint, query.getBody().toString()); + JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + EsResponse esResponse = result.toJavaObject(EsResponse.class); + page.setTotal(Long.valueOf(esResponse.getHits().getTotal())); + String scrollId = result.getString("_scroll_id"); + entPoint = "_search/scroll"; + JSONObject scrollBody = new JSONObject(); + scrollBody.put("scroll", "1m"); + scrollBody.put("scroll_id", scrollId); + while (esResponse.getDatas().size() > 0 && currentSize > 10000) { + method = "GET"; + response = performRequest(method, entPoint, scrollBody.toString()); + result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + esResponse = result.toJavaObject(EsResponse.class); + if (ObjectUtil.isNotEmpty(esResponse.getDatas())) { + results = esResponse.getDatas(); + } + currentSize -= 10000; + } + // 查询完成后,及时删除scroll,释放资源 + scrollBody.remove("scroll"); + method = "DELETE"; + performRequest(method, entPoint, scrollBody.toString()); + logger.debug(query.getBody().toString()); + results = results.subList(Integer.valueOf(String.valueOf(currentSize)), Integer.valueOf(String.valueOf(currentSize + page.getSize()))); + page.setRecords(results); + return page; + } + + /** * 根据条件进行滚动查询 */ - public static List searchQueryScroll(String index, String type, ElasticSearchQuery query) throws IOException { + public static List searchQueryScroll(String index, String type, ElasticSearchQuery query) throws IOException { // 首次查询,提交查询条件,endpoint增加‘?scroll=1m’ List results = new ArrayList<>(); String method = "POST"; diff --git a/casic-data/src/main/java/com/casic/missiles/modular/system/service/impl/BusPatrolLogServiceImpl.java b/casic-data/src/main/java/com/casic/missiles/modular/system/service/impl/BusPatrolLogServiceImpl.java index f408792..a251a90 100644 --- a/casic-data/src/main/java/com/casic/missiles/modular/system/service/impl/BusPatrolLogServiceImpl.java +++ b/casic-data/src/main/java/com/casic/missiles/modular/system/service/impl/BusPatrolLogServiceImpl.java @@ -54,9 +54,10 @@ @Override public Page patrolLogListPage(Page page, Long monitorId, String startTime, String endTime) throws IOException { ElasticSearchQuery elasticSearchQuery = initParam(monitorId, startTime, endTime); - elasticSearchQuery.size(Integer.valueOf(String.valueOf(page.getSize()))); - elasticSearchQuery.from(Integer.valueOf(String.valueOf(page.getCurrent()*page.getSize()))); - page = ElasticSearchUtil.searchQueryPage(page, DataConst.DATA_GAS_ES_INDEX, DataConst.DATA_GAS_ES_TYPE, elasticSearchQuery); + elasticSearchQuery.size(page.getSize()); +// elasticSearchQuery.from(Integer.valueOf(String.valueOf(page.getCurrent()*page.getSize()))); + ElasticSearchUtil.searchScrollQueryPage(page,DataConst.DATA_GAS_ES_INDEX, DataConst.DATA_GAS_ES_TYPE, elasticSearchQuery); +// searchQueryPage(page, DataConst.DATA_GAS_ES_INDEX, DataConst.DATA_GAS_ES_TYPE, elasticSearchQuery); return page; } @@ -68,7 +69,7 @@ page.setSize(10000); try { ElasticSearchQuery elasticSearchQuery = initParam(busPatrolLogDTO.getMonitorId(), busPatrolLogDTO.getStartTime(), busPatrolLogDTO.getEndTime()); - elasticSearchQuery.size(1000); + elasticSearchQuery.size(1000L); List responseList = ElasticSearchUtil.searchQueryScroll(DataConst.DATA_GAS_ES_INDEX, DataConst.DATA_GAS_ES_TYPE, elasticSearchQuery); List expResponseList = new ArrayList<>(); responseList.forEach(busPatrolLogResponse -> { diff --git a/casic-data/src/main/java/com/casic/missiles/modular/system/service/impl/DataGasServiceImpl.java b/casic-data/src/main/java/com/casic/missiles/modular/system/service/impl/DataGasServiceImpl.java index 87d0ae5..92f42ed 100644 --- a/casic-data/src/main/java/com/casic/missiles/modular/system/service/impl/DataGasServiceImpl.java +++ b/casic-data/src/main/java/com/casic/missiles/modular/system/service/impl/DataGasServiceImpl.java @@ -40,7 +40,7 @@ elasticSearchQuery.term("monitorId", monitorId); } elasticSearchQuery.range("logTime", startTime, endTime); - elasticSearchQuery.size(10000); + elasticSearchQuery.size(10000L); elasticSearchQuery.sort("logTime", "DESC"); List dataGasEsList = ElasticSearchUtil.searchQuery(DataConst.DATA_GAS_ES_INDEX, DataConst.DATA_GAS_ES_TYPE, elasticSearchQuery); return dataGasEsList;