diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java index 5dbe030..320382b 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java @@ -9,6 +9,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.ListIterator; /** * ES查询请求体 @@ -144,7 +145,7 @@ } /** - * 条件关闭口,将bool中的元素添加到must集合中 + * 条件关闭口,将bool中的元素添加到must集合中 */ public void endShould() { this.filterBoolMust.add(this.boolBuild); @@ -201,7 +202,6 @@ } - public void match(String filed, Serializable value) { JSONArray boolMust = this.boolMust; JSONObject match = new JSONObject(); @@ -258,6 +258,31 @@ this.range(filed, from, to, MUST_FLAG); } + public void removeRange() { + String key = "range"; + if (this.filterBoolMust != null) { + ListIterator integerListIterator = this.filterBoolMust.listIterator(); + while (integerListIterator.hasNext()){ + Object next = integerListIterator.next(); + if (((JSONObject) next).containsKey(key)) { + ((JSONObject) next).remove(key); + integerListIterator.remove(); + } + } + } + if (this.filterBoolMustnot != null) { + ListIterator integerListIterator = this.filterBoolMustnot.listIterator(); + while (integerListIterator.hasNext()){ + Object next = integerListIterator.next(); + if (((JSONObject) next).containsKey(key)) { + ((JSONObject) next).remove(key); + integerListIterator.remove(); + } + } + } + + } + /** * 范围条件是否必须 */ @@ -287,7 +312,7 @@ public void from(int from) { this.body.put("from", from); - if(from>10000){ + if (from > 10000) { this.body.put("query", "match_all"); } } diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java index 5dbe030..320382b 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java @@ -9,6 +9,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.ListIterator; /** * ES查询请求体 @@ -144,7 +145,7 @@ } /** - * 条件关闭口,将bool中的元素添加到must集合中 + * 条件关闭口,将bool中的元素添加到must集合中 */ public void endShould() { this.filterBoolMust.add(this.boolBuild); @@ -201,7 +202,6 @@ } - public void match(String filed, Serializable value) { JSONArray boolMust = this.boolMust; JSONObject match = new JSONObject(); @@ -258,6 +258,31 @@ this.range(filed, from, to, MUST_FLAG); } + public void removeRange() { + String key = "range"; + if (this.filterBoolMust != null) { + ListIterator integerListIterator = this.filterBoolMust.listIterator(); + while (integerListIterator.hasNext()){ + Object next = integerListIterator.next(); + if (((JSONObject) next).containsKey(key)) { + ((JSONObject) next).remove(key); + integerListIterator.remove(); + } + } + } + if (this.filterBoolMustnot != null) { + ListIterator integerListIterator = this.filterBoolMustnot.listIterator(); + while (integerListIterator.hasNext()){ + Object next = integerListIterator.next(); + if (((JSONObject) next).containsKey(key)) { + ((JSONObject) next).remove(key); + integerListIterator.remove(); + } + } + } + + } + /** * 范围条件是否必须 */ @@ -287,7 +312,7 @@ public void from(int from) { this.body.put("from", from); - if(from>10000){ + if (from > 10000) { this.body.put("query", "match_all"); } } diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java index 4bbd046..6cd932a 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java @@ -1,11 +1,13 @@ package com.casic.missiles.es; +import cn.hutool.core.stream.CollectorUtil; import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; -import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; +import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.casic.missiles.dto.EsResponse; +import org.apache.commons.lang3.StringUtils; import org.apache.http.nio.entity.NStringEntity; import org.apache.http.util.EntityUtils; import org.apache.poi.ss.formula.functions.T; @@ -18,12 +20,14 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; import javax.annotation.PostConstruct; import java.io.IOException; -import java.util.ArrayList; -import java.util.Base64; -import java.util.List; +import java.text.SimpleDateFormat; +import java.time.LocalTime; +import java.time.temporal.ChronoUnit; +import java.util.*; /** * ES工具类 @@ -137,19 +141,123 @@ * 1、根据条件查询返回分页,小于1w条最直接返回 * 2、超过1w条,通过回滚框分情况讨论 */ - public static Page searchScrollQueryPage(Page page, String index, String type, ElasticSearchQuery query) throws IOException { + public static Page searchScrollQueryPage(Page page, String index, String type, String startTime, String endTime, ElasticSearchQuery query) throws Exception { Long currentSize = page.getSize() * page.getCurrent(); if (currentSize < 10000) { + query.range("logTime", startTime, endTime); return searchQueryPage(page, index, type, query); } else { - query.size(10000L); - return searchQueryScrollPage(page, index, type, query); + return budgetQueryPage(page, index, type, startTime, endTime, query); } } /** + * 1、通过总条数/3600计算小时数 + * 2、获取第一条数据的时间差,然后再次进行计算 + * 3、预算查询,直到获取10000条内的数据,这样效率会远高于窗口滚动查询 + */ + private static Page budgetQueryPage(Page page, String index, String type, String startTime, String endTime, ElasticSearchQuery query) throws Exception { + String method = "POST"; + String entPoint = index.concat("/").concat(type).concat("/").concat("_search"); + JSONObject jsonObject = getOffsetTime(page, startTime, endTime, entPoint, query); + if (ObjectUtils.isEmpty(jsonObject)) { + return page; + } + query.removeRange(); + query.range("logTime", startTime, (String) jsonObject.get("time")); + query.from(Integer.valueOf(String.valueOf(jsonObject.get("from")))); + Response response = performRequest(method, entPoint, query.getBody().toString()); + JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + EsResponse esResponse = result.toJavaObject(EsResponse.class); + List results = esResponse.getDatas(); + page.setRecords(results); + page.setTotal(Long.valueOf(esResponse.getHits().getTotal())); + return page; + } + + private static JSONObject getOffsetTime(Page page, String startTime, String endTime, String entPoint, ElasticSearchQuery query) throws Exception { + Long currentSize = page.getSize() * page.getCurrent(); + String newEndTime = ""; + Long offsetHour = currentSize / 3600; + newEndTime = getOffsetTime(endTime, -Integer.valueOf(String.valueOf(offsetHour))); + while (true) { + EsResponse leftTotal = mergerQueryTime(startTime, newEndTime, query, entPoint); + Integer rightTotal = mergerQuery(newEndTime, endTime, query, entPoint); + if (currentSize - rightTotal < 10000&¤tSize - rightTotal >0) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", newEndTime); + jsonObject.put("from", currentSize - rightTotal); + return jsonObject; + } + //最近是空数据 + if (rightTotal == 0 || currentSize - rightTotal > 0) { + //重新偏移坐标,计算坐标 + currentSize -= rightTotal; + if (CollectionUtils.isEmpty(leftTotal.getDatas()) || Long.valueOf(leftTotal.getHits().getTotal()) < 10000) { + return null; + } + newEndTime = (String) ((JSONObject) leftTotal.getDatas().get(0)).get("logTime"); + endTime = newEndTime; + newEndTime = getOffsetTime(endTime, -Integer.valueOf(String.valueOf(offsetHour))); + } else if (currentSize - rightTotal < 0) { + double multiple = (double) rightTotal / currentSize; + offsetHour = (long) (offsetHour / multiple); + newEndTime = getOffsetTime(endTime, -Integer.valueOf(String.valueOf(offsetHour))); + } + if (currentSize > Long.valueOf(leftTotal.getHits().getTotal())) { + return null; + } + } + } + + + private static EsResponse mergerQueryTime(String beginTime, String endTime, ElasticSearchQuery query, String entPoint) throws Exception { + String method = "POST"; + query.removeRange(); + query.range("logTime", beginTime, endTime); + logger.debug(query.getBody().toString()); + Response response = performRequest(method, entPoint, query.getBody().toString()); + // 获取response body,转换为json对象 + JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + EsResponse esResponse = result.toJavaObject(EsResponse.class); + return esResponse; + } + + private static Integer mergerQuery(String beginTime, String endTime, ElasticSearchQuery query, String entPoint) throws Exception { + String method = "POST"; + query.removeRange(); + query.range("logTime", beginTime, endTime); + Response response = performRequest(method, entPoint, query.getBody().toString()); + // 获取response body,转换为json对象 + JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + EsResponse esResponse = result.toJavaObject(EsResponse.class); + int total = Integer.valueOf(esResponse.getHits().getTotal()); + return total; + } + + /** + * 获取偏移时间 + * + * @param beginTime + * @param offsetHour + * @return + * @throws Exception + */ + private static String getOffsetTime(String beginTime, Integer offsetHour) throws Exception { + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + if (StringUtils.isEmpty(beginTime)) { + beginTime = format.format(new Date()); + } + //获取上一条最后的保存时间与当前时间进行相减,与保存间隔相比较 + Calendar calendar = Calendar.getInstance(); + calendar.setTime(format.parse(beginTime)); + calendar.add(Calendar.HOUR_OF_DAY, offsetHour); + return format.format(calendar.getTime()); + } + + /** * 1w条以上的通过滑动窗口实现 - * 1、 + * 太慢了,丢弃 * * @param index * @param type @@ -158,37 +266,46 @@ * @return * @throws IOException */ - public static Page searchQueryScrollPage(Page page, String index, String type, ElasticSearchQuery query) throws IOException { + public static Page searchQueryScrollPage(Page page, String index, String type, ElasticSearchQuery query) throws + IOException { String method = "POST"; List results = new ArrayList<>(); - Long currentSize = page.getSize() * page.getCurrent(); + query.size(10000L); + Long currentSize = page.getSize() * page.getCurrent() + page.getSize(); String entPoint = index.concat("/").concat(type).concat("/").concat("_search").concat("?scroll=1m"); logger.debug(query.getBody().toString()); Response response = performRequest(method, entPoint, query.getBody().toString()); JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); EsResponse esResponse = result.toJavaObject(EsResponse.class); page.setTotal(Long.valueOf(esResponse.getHits().getTotal())); + if (currentSize >= page.getTotal()) { + return page; + } + currentSize -= 10000; String scrollId = result.getString("_scroll_id"); entPoint = "_search/scroll"; JSONObject scrollBody = new JSONObject(); scrollBody.put("scroll", "1m"); scrollBody.put("scroll_id", scrollId); - while (esResponse.getDatas().size() > 0 && currentSize > 10000) { - method = "GET"; + method = "GET"; + Long tempSize = currentSize > 10000 ? 10000 : currentSize; + while (esResponse.getDatas().size() > 0 && currentSize > 0) { + query.size(tempSize); response = performRequest(method, entPoint, scrollBody.toString()); - result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); - esResponse = result.toJavaObject(EsResponse.class); - if (ObjectUtil.isNotEmpty(esResponse.getDatas())) { - results = esResponse.getDatas(); - } - currentSize -= 10000; + currentSize -= currentSize > 10000 ? 10000 : currentSize; + } + result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + esResponse = result.toJavaObject(EsResponse.class); + if (ObjectUtil.isNotEmpty(esResponse.getDatas())) { + results = esResponse.getDatas(); } // 查询完成后,及时删除scroll,释放资源 scrollBody.remove("scroll"); method = "DELETE"; performRequest(method, entPoint, scrollBody.toString()); logger.debug(query.getBody().toString()); - results = results.subList(Integer.valueOf(String.valueOf(currentSize)), Integer.valueOf(String.valueOf(currentSize + page.getSize()))); + Long beginIndex = (page.getSize() * page.getCurrent()) / 10000; + results = results.subList(Integer.valueOf(String.valueOf(beginIndex)), Integer.valueOf(String.valueOf(beginIndex + page.getSize()))); page.setRecords(results); return page; } @@ -196,7 +313,8 @@ /** * 根据条件进行滚动查询 */ - public static List searchQueryScroll(String index, String type, ElasticSearchQuery query) throws IOException { + public static List searchQueryScroll(String index, String type, ElasticSearchQuery query) throws + IOException { // 首次查询,提交查询条件,endpoint增加‘?scroll=1m’ List results = new ArrayList<>(); String method = "POST"; @@ -258,7 +376,6 @@ if (ObjectUtil.isNotEmpty(entity)) { request.setEntity(new NStringEntity(entity, "utf-8")); } - if (ObjectUtil.isNotEmpty(username) && ObjectUtil.isNotEmpty(password)) { RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); String token = username.concat(":").concat(password); diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java index 5dbe030..320382b 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java @@ -9,6 +9,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.ListIterator; /** * ES查询请求体 @@ -144,7 +145,7 @@ } /** - * 条件关闭口,将bool中的元素添加到must集合中 + * 条件关闭口,将bool中的元素添加到must集合中 */ public void endShould() { this.filterBoolMust.add(this.boolBuild); @@ -201,7 +202,6 @@ } - public void match(String filed, Serializable value) { JSONArray boolMust = this.boolMust; JSONObject match = new JSONObject(); @@ -258,6 +258,31 @@ this.range(filed, from, to, MUST_FLAG); } + public void removeRange() { + String key = "range"; + if (this.filterBoolMust != null) { + ListIterator integerListIterator = this.filterBoolMust.listIterator(); + while (integerListIterator.hasNext()){ + Object next = integerListIterator.next(); + if (((JSONObject) next).containsKey(key)) { + ((JSONObject) next).remove(key); + integerListIterator.remove(); + } + } + } + if (this.filterBoolMustnot != null) { + ListIterator integerListIterator = this.filterBoolMustnot.listIterator(); + while (integerListIterator.hasNext()){ + Object next = integerListIterator.next(); + if (((JSONObject) next).containsKey(key)) { + ((JSONObject) next).remove(key); + integerListIterator.remove(); + } + } + } + + } + /** * 范围条件是否必须 */ @@ -287,7 +312,7 @@ public void from(int from) { this.body.put("from", from); - if(from>10000){ + if (from > 10000) { this.body.put("query", "match_all"); } } diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java index 4bbd046..6cd932a 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java @@ -1,11 +1,13 @@ package com.casic.missiles.es; +import cn.hutool.core.stream.CollectorUtil; import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; -import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; +import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.casic.missiles.dto.EsResponse; +import org.apache.commons.lang3.StringUtils; import org.apache.http.nio.entity.NStringEntity; import org.apache.http.util.EntityUtils; import org.apache.poi.ss.formula.functions.T; @@ -18,12 +20,14 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; import javax.annotation.PostConstruct; import java.io.IOException; -import java.util.ArrayList; -import java.util.Base64; -import java.util.List; +import java.text.SimpleDateFormat; +import java.time.LocalTime; +import java.time.temporal.ChronoUnit; +import java.util.*; /** * ES工具类 @@ -137,19 +141,123 @@ * 1、根据条件查询返回分页,小于1w条最直接返回 * 2、超过1w条,通过回滚框分情况讨论 */ - public static Page searchScrollQueryPage(Page page, String index, String type, ElasticSearchQuery query) throws IOException { + public static Page searchScrollQueryPage(Page page, String index, String type, String startTime, String endTime, ElasticSearchQuery query) throws Exception { Long currentSize = page.getSize() * page.getCurrent(); if (currentSize < 10000) { + query.range("logTime", startTime, endTime); return searchQueryPage(page, index, type, query); } else { - query.size(10000L); - return searchQueryScrollPage(page, index, type, query); + return budgetQueryPage(page, index, type, startTime, endTime, query); } } /** + * 1、通过总条数/3600计算小时数 + * 2、获取第一条数据的时间差,然后再次进行计算 + * 3、预算查询,直到获取10000条内的数据,这样效率会远高于窗口滚动查询 + */ + private static Page budgetQueryPage(Page page, String index, String type, String startTime, String endTime, ElasticSearchQuery query) throws Exception { + String method = "POST"; + String entPoint = index.concat("/").concat(type).concat("/").concat("_search"); + JSONObject jsonObject = getOffsetTime(page, startTime, endTime, entPoint, query); + if (ObjectUtils.isEmpty(jsonObject)) { + return page; + } + query.removeRange(); + query.range("logTime", startTime, (String) jsonObject.get("time")); + query.from(Integer.valueOf(String.valueOf(jsonObject.get("from")))); + Response response = performRequest(method, entPoint, query.getBody().toString()); + JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + EsResponse esResponse = result.toJavaObject(EsResponse.class); + List results = esResponse.getDatas(); + page.setRecords(results); + page.setTotal(Long.valueOf(esResponse.getHits().getTotal())); + return page; + } + + private static JSONObject getOffsetTime(Page page, String startTime, String endTime, String entPoint, ElasticSearchQuery query) throws Exception { + Long currentSize = page.getSize() * page.getCurrent(); + String newEndTime = ""; + Long offsetHour = currentSize / 3600; + newEndTime = getOffsetTime(endTime, -Integer.valueOf(String.valueOf(offsetHour))); + while (true) { + EsResponse leftTotal = mergerQueryTime(startTime, newEndTime, query, entPoint); + Integer rightTotal = mergerQuery(newEndTime, endTime, query, entPoint); + if (currentSize - rightTotal < 10000&¤tSize - rightTotal >0) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", newEndTime); + jsonObject.put("from", currentSize - rightTotal); + return jsonObject; + } + //最近是空数据 + if (rightTotal == 0 || currentSize - rightTotal > 0) { + //重新偏移坐标,计算坐标 + currentSize -= rightTotal; + if (CollectionUtils.isEmpty(leftTotal.getDatas()) || Long.valueOf(leftTotal.getHits().getTotal()) < 10000) { + return null; + } + newEndTime = (String) ((JSONObject) leftTotal.getDatas().get(0)).get("logTime"); + endTime = newEndTime; + newEndTime = getOffsetTime(endTime, -Integer.valueOf(String.valueOf(offsetHour))); + } else if (currentSize - rightTotal < 0) { + double multiple = (double) rightTotal / currentSize; + offsetHour = (long) (offsetHour / multiple); + newEndTime = getOffsetTime(endTime, -Integer.valueOf(String.valueOf(offsetHour))); + } + if (currentSize > Long.valueOf(leftTotal.getHits().getTotal())) { + return null; + } + } + } + + + private static EsResponse mergerQueryTime(String beginTime, String endTime, ElasticSearchQuery query, String entPoint) throws Exception { + String method = "POST"; + query.removeRange(); + query.range("logTime", beginTime, endTime); + logger.debug(query.getBody().toString()); + Response response = performRequest(method, entPoint, query.getBody().toString()); + // 获取response body,转换为json对象 + JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + EsResponse esResponse = result.toJavaObject(EsResponse.class); + return esResponse; + } + + private static Integer mergerQuery(String beginTime, String endTime, ElasticSearchQuery query, String entPoint) throws Exception { + String method = "POST"; + query.removeRange(); + query.range("logTime", beginTime, endTime); + Response response = performRequest(method, entPoint, query.getBody().toString()); + // 获取response body,转换为json对象 + JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + EsResponse esResponse = result.toJavaObject(EsResponse.class); + int total = Integer.valueOf(esResponse.getHits().getTotal()); + return total; + } + + /** + * 获取偏移时间 + * + * @param beginTime + * @param offsetHour + * @return + * @throws Exception + */ + private static String getOffsetTime(String beginTime, Integer offsetHour) throws Exception { + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + if (StringUtils.isEmpty(beginTime)) { + beginTime = format.format(new Date()); + } + //获取上一条最后的保存时间与当前时间进行相减,与保存间隔相比较 + Calendar calendar = Calendar.getInstance(); + calendar.setTime(format.parse(beginTime)); + calendar.add(Calendar.HOUR_OF_DAY, offsetHour); + return format.format(calendar.getTime()); + } + + /** * 1w条以上的通过滑动窗口实现 - * 1、 + * 太慢了,丢弃 * * @param index * @param type @@ -158,37 +266,46 @@ * @return * @throws IOException */ - public static Page searchQueryScrollPage(Page page, String index, String type, ElasticSearchQuery query) throws IOException { + public static Page searchQueryScrollPage(Page page, String index, String type, ElasticSearchQuery query) throws + IOException { String method = "POST"; List results = new ArrayList<>(); - Long currentSize = page.getSize() * page.getCurrent(); + query.size(10000L); + Long currentSize = page.getSize() * page.getCurrent() + page.getSize(); String entPoint = index.concat("/").concat(type).concat("/").concat("_search").concat("?scroll=1m"); logger.debug(query.getBody().toString()); Response response = performRequest(method, entPoint, query.getBody().toString()); JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); EsResponse esResponse = result.toJavaObject(EsResponse.class); page.setTotal(Long.valueOf(esResponse.getHits().getTotal())); + if (currentSize >= page.getTotal()) { + return page; + } + currentSize -= 10000; String scrollId = result.getString("_scroll_id"); entPoint = "_search/scroll"; JSONObject scrollBody = new JSONObject(); scrollBody.put("scroll", "1m"); scrollBody.put("scroll_id", scrollId); - while (esResponse.getDatas().size() > 0 && currentSize > 10000) { - method = "GET"; + method = "GET"; + Long tempSize = currentSize > 10000 ? 10000 : currentSize; + while (esResponse.getDatas().size() > 0 && currentSize > 0) { + query.size(tempSize); response = performRequest(method, entPoint, scrollBody.toString()); - result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); - esResponse = result.toJavaObject(EsResponse.class); - if (ObjectUtil.isNotEmpty(esResponse.getDatas())) { - results = esResponse.getDatas(); - } - currentSize -= 10000; + currentSize -= currentSize > 10000 ? 10000 : currentSize; + } + result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + esResponse = result.toJavaObject(EsResponse.class); + if (ObjectUtil.isNotEmpty(esResponse.getDatas())) { + results = esResponse.getDatas(); } // 查询完成后,及时删除scroll,释放资源 scrollBody.remove("scroll"); method = "DELETE"; performRequest(method, entPoint, scrollBody.toString()); logger.debug(query.getBody().toString()); - results = results.subList(Integer.valueOf(String.valueOf(currentSize)), Integer.valueOf(String.valueOf(currentSize + page.getSize()))); + Long beginIndex = (page.getSize() * page.getCurrent()) / 10000; + results = results.subList(Integer.valueOf(String.valueOf(beginIndex)), Integer.valueOf(String.valueOf(beginIndex + page.getSize()))); page.setRecords(results); return page; } @@ -196,7 +313,8 @@ /** * 根据条件进行滚动查询 */ - public static List searchQueryScroll(String index, String type, ElasticSearchQuery query) throws IOException { + public static List searchQueryScroll(String index, String type, ElasticSearchQuery query) throws + IOException { // 首次查询,提交查询条件,endpoint增加‘?scroll=1m’ List results = new ArrayList<>(); String method = "POST"; @@ -258,7 +376,6 @@ if (ObjectUtil.isNotEmpty(entity)) { request.setEntity(new NStringEntity(entity, "utf-8")); } - if (ObjectUtil.isNotEmpty(username) && ObjectUtil.isNotEmpty(password)) { RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); String token = username.concat(":").concat(password); diff --git a/casic-data/src/main/java/com/casic/missiles/modular/system/controller/BusPatrolLogController.java b/casic-data/src/main/java/com/casic/missiles/modular/system/controller/BusPatrolLogController.java index a0a18d5..12f269c 100644 --- a/casic-data/src/main/java/com/casic/missiles/modular/system/controller/BusPatrolLogController.java +++ b/casic-data/src/main/java/com/casic/missiles/modular/system/controller/BusPatrolLogController.java @@ -41,7 +41,7 @@ * 获取巡检日志分页列表 */ @RequestMapping(value = "/listPage") - public Object listPage(@RequestBody @Valid BusPatrolLogDTO busPatrolLogDTO, BindingResult bindingResult) throws IOException { + public Object listPage(@RequestBody @Valid BusPatrolLogDTO busPatrolLogDTO, BindingResult bindingResult) throws Exception { Assert.isFalse(bindingResult.hasErrors(), () -> { throw new BusinessException(CoreExceptionEnum.REQUEST_NULL.getCode(), bindingResult.getFieldError().getDefaultMessage()); }); diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java index 5dbe030..320382b 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java @@ -9,6 +9,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.ListIterator; /** * ES查询请求体 @@ -144,7 +145,7 @@ } /** - * 条件关闭口,将bool中的元素添加到must集合中 + * 条件关闭口,将bool中的元素添加到must集合中 */ public void endShould() { this.filterBoolMust.add(this.boolBuild); @@ -201,7 +202,6 @@ } - public void match(String filed, Serializable value) { JSONArray boolMust = this.boolMust; JSONObject match = new JSONObject(); @@ -258,6 +258,31 @@ this.range(filed, from, to, MUST_FLAG); } + public void removeRange() { + String key = "range"; + if (this.filterBoolMust != null) { + ListIterator integerListIterator = this.filterBoolMust.listIterator(); + while (integerListIterator.hasNext()){ + Object next = integerListIterator.next(); + if (((JSONObject) next).containsKey(key)) { + ((JSONObject) next).remove(key); + integerListIterator.remove(); + } + } + } + if (this.filterBoolMustnot != null) { + ListIterator integerListIterator = this.filterBoolMustnot.listIterator(); + while (integerListIterator.hasNext()){ + Object next = integerListIterator.next(); + if (((JSONObject) next).containsKey(key)) { + ((JSONObject) next).remove(key); + integerListIterator.remove(); + } + } + } + + } + /** * 范围条件是否必须 */ @@ -287,7 +312,7 @@ public void from(int from) { this.body.put("from", from); - if(from>10000){ + if (from > 10000) { this.body.put("query", "match_all"); } } diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java index 4bbd046..6cd932a 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java @@ -1,11 +1,13 @@ package com.casic.missiles.es; +import cn.hutool.core.stream.CollectorUtil; import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; -import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; +import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.casic.missiles.dto.EsResponse; +import org.apache.commons.lang3.StringUtils; import org.apache.http.nio.entity.NStringEntity; import org.apache.http.util.EntityUtils; import org.apache.poi.ss.formula.functions.T; @@ -18,12 +20,14 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; import javax.annotation.PostConstruct; import java.io.IOException; -import java.util.ArrayList; -import java.util.Base64; -import java.util.List; +import java.text.SimpleDateFormat; +import java.time.LocalTime; +import java.time.temporal.ChronoUnit; +import java.util.*; /** * ES工具类 @@ -137,19 +141,123 @@ * 1、根据条件查询返回分页,小于1w条最直接返回 * 2、超过1w条,通过回滚框分情况讨论 */ - public static Page searchScrollQueryPage(Page page, String index, String type, ElasticSearchQuery query) throws IOException { + public static Page searchScrollQueryPage(Page page, String index, String type, String startTime, String endTime, ElasticSearchQuery query) throws Exception { Long currentSize = page.getSize() * page.getCurrent(); if (currentSize < 10000) { + query.range("logTime", startTime, endTime); return searchQueryPage(page, index, type, query); } else { - query.size(10000L); - return searchQueryScrollPage(page, index, type, query); + return budgetQueryPage(page, index, type, startTime, endTime, query); } } /** + * 1、通过总条数/3600计算小时数 + * 2、获取第一条数据的时间差,然后再次进行计算 + * 3、预算查询,直到获取10000条内的数据,这样效率会远高于窗口滚动查询 + */ + private static Page budgetQueryPage(Page page, String index, String type, String startTime, String endTime, ElasticSearchQuery query) throws Exception { + String method = "POST"; + String entPoint = index.concat("/").concat(type).concat("/").concat("_search"); + JSONObject jsonObject = getOffsetTime(page, startTime, endTime, entPoint, query); + if (ObjectUtils.isEmpty(jsonObject)) { + return page; + } + query.removeRange(); + query.range("logTime", startTime, (String) jsonObject.get("time")); + query.from(Integer.valueOf(String.valueOf(jsonObject.get("from")))); + Response response = performRequest(method, entPoint, query.getBody().toString()); + JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + EsResponse esResponse = result.toJavaObject(EsResponse.class); + List results = esResponse.getDatas(); + page.setRecords(results); + page.setTotal(Long.valueOf(esResponse.getHits().getTotal())); + return page; + } + + private static JSONObject getOffsetTime(Page page, String startTime, String endTime, String entPoint, ElasticSearchQuery query) throws Exception { + Long currentSize = page.getSize() * page.getCurrent(); + String newEndTime = ""; + Long offsetHour = currentSize / 3600; + newEndTime = getOffsetTime(endTime, -Integer.valueOf(String.valueOf(offsetHour))); + while (true) { + EsResponse leftTotal = mergerQueryTime(startTime, newEndTime, query, entPoint); + Integer rightTotal = mergerQuery(newEndTime, endTime, query, entPoint); + if (currentSize - rightTotal < 10000&¤tSize - rightTotal >0) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", newEndTime); + jsonObject.put("from", currentSize - rightTotal); + return jsonObject; + } + //最近是空数据 + if (rightTotal == 0 || currentSize - rightTotal > 0) { + //重新偏移坐标,计算坐标 + currentSize -= rightTotal; + if (CollectionUtils.isEmpty(leftTotal.getDatas()) || Long.valueOf(leftTotal.getHits().getTotal()) < 10000) { + return null; + } + newEndTime = (String) ((JSONObject) leftTotal.getDatas().get(0)).get("logTime"); + endTime = newEndTime; + newEndTime = getOffsetTime(endTime, -Integer.valueOf(String.valueOf(offsetHour))); + } else if (currentSize - rightTotal < 0) { + double multiple = (double) rightTotal / currentSize; + offsetHour = (long) (offsetHour / multiple); + newEndTime = getOffsetTime(endTime, -Integer.valueOf(String.valueOf(offsetHour))); + } + if (currentSize > Long.valueOf(leftTotal.getHits().getTotal())) { + return null; + } + } + } + + + private static EsResponse mergerQueryTime(String beginTime, String endTime, ElasticSearchQuery query, String entPoint) throws Exception { + String method = "POST"; + query.removeRange(); + query.range("logTime", beginTime, endTime); + logger.debug(query.getBody().toString()); + Response response = performRequest(method, entPoint, query.getBody().toString()); + // 获取response body,转换为json对象 + JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + EsResponse esResponse = result.toJavaObject(EsResponse.class); + return esResponse; + } + + private static Integer mergerQuery(String beginTime, String endTime, ElasticSearchQuery query, String entPoint) throws Exception { + String method = "POST"; + query.removeRange(); + query.range("logTime", beginTime, endTime); + Response response = performRequest(method, entPoint, query.getBody().toString()); + // 获取response body,转换为json对象 + JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + EsResponse esResponse = result.toJavaObject(EsResponse.class); + int total = Integer.valueOf(esResponse.getHits().getTotal()); + return total; + } + + /** + * 获取偏移时间 + * + * @param beginTime + * @param offsetHour + * @return + * @throws Exception + */ + private static String getOffsetTime(String beginTime, Integer offsetHour) throws Exception { + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + if (StringUtils.isEmpty(beginTime)) { + beginTime = format.format(new Date()); + } + //获取上一条最后的保存时间与当前时间进行相减,与保存间隔相比较 + Calendar calendar = Calendar.getInstance(); + calendar.setTime(format.parse(beginTime)); + calendar.add(Calendar.HOUR_OF_DAY, offsetHour); + return format.format(calendar.getTime()); + } + + /** * 1w条以上的通过滑动窗口实现 - * 1、 + * 太慢了,丢弃 * * @param index * @param type @@ -158,37 +266,46 @@ * @return * @throws IOException */ - public static Page searchQueryScrollPage(Page page, String index, String type, ElasticSearchQuery query) throws IOException { + public static Page searchQueryScrollPage(Page page, String index, String type, ElasticSearchQuery query) throws + IOException { String method = "POST"; List results = new ArrayList<>(); - Long currentSize = page.getSize() * page.getCurrent(); + query.size(10000L); + Long currentSize = page.getSize() * page.getCurrent() + page.getSize(); String entPoint = index.concat("/").concat(type).concat("/").concat("_search").concat("?scroll=1m"); logger.debug(query.getBody().toString()); Response response = performRequest(method, entPoint, query.getBody().toString()); JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); EsResponse esResponse = result.toJavaObject(EsResponse.class); page.setTotal(Long.valueOf(esResponse.getHits().getTotal())); + if (currentSize >= page.getTotal()) { + return page; + } + currentSize -= 10000; String scrollId = result.getString("_scroll_id"); entPoint = "_search/scroll"; JSONObject scrollBody = new JSONObject(); scrollBody.put("scroll", "1m"); scrollBody.put("scroll_id", scrollId); - while (esResponse.getDatas().size() > 0 && currentSize > 10000) { - method = "GET"; + method = "GET"; + Long tempSize = currentSize > 10000 ? 10000 : currentSize; + while (esResponse.getDatas().size() > 0 && currentSize > 0) { + query.size(tempSize); response = performRequest(method, entPoint, scrollBody.toString()); - result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); - esResponse = result.toJavaObject(EsResponse.class); - if (ObjectUtil.isNotEmpty(esResponse.getDatas())) { - results = esResponse.getDatas(); - } - currentSize -= 10000; + currentSize -= currentSize > 10000 ? 10000 : currentSize; + } + result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + esResponse = result.toJavaObject(EsResponse.class); + if (ObjectUtil.isNotEmpty(esResponse.getDatas())) { + results = esResponse.getDatas(); } // 查询完成后,及时删除scroll,释放资源 scrollBody.remove("scroll"); method = "DELETE"; performRequest(method, entPoint, scrollBody.toString()); logger.debug(query.getBody().toString()); - results = results.subList(Integer.valueOf(String.valueOf(currentSize)), Integer.valueOf(String.valueOf(currentSize + page.getSize()))); + Long beginIndex = (page.getSize() * page.getCurrent()) / 10000; + results = results.subList(Integer.valueOf(String.valueOf(beginIndex)), Integer.valueOf(String.valueOf(beginIndex + page.getSize()))); page.setRecords(results); return page; } @@ -196,7 +313,8 @@ /** * 根据条件进行滚动查询 */ - public static List searchQueryScroll(String index, String type, ElasticSearchQuery query) throws IOException { + public static List searchQueryScroll(String index, String type, ElasticSearchQuery query) throws + IOException { // 首次查询,提交查询条件,endpoint增加‘?scroll=1m’ List results = new ArrayList<>(); String method = "POST"; @@ -258,7 +376,6 @@ if (ObjectUtil.isNotEmpty(entity)) { request.setEntity(new NStringEntity(entity, "utf-8")); } - if (ObjectUtil.isNotEmpty(username) && ObjectUtil.isNotEmpty(password)) { RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); String token = username.concat(":").concat(password); diff --git a/casic-data/src/main/java/com/casic/missiles/modular/system/controller/BusPatrolLogController.java b/casic-data/src/main/java/com/casic/missiles/modular/system/controller/BusPatrolLogController.java index a0a18d5..12f269c 100644 --- a/casic-data/src/main/java/com/casic/missiles/modular/system/controller/BusPatrolLogController.java +++ b/casic-data/src/main/java/com/casic/missiles/modular/system/controller/BusPatrolLogController.java @@ -41,7 +41,7 @@ * 获取巡检日志分页列表 */ @RequestMapping(value = "/listPage") - public Object listPage(@RequestBody @Valid BusPatrolLogDTO busPatrolLogDTO, BindingResult bindingResult) throws IOException { + public Object listPage(@RequestBody @Valid BusPatrolLogDTO busPatrolLogDTO, BindingResult bindingResult) throws Exception { Assert.isFalse(bindingResult.hasErrors(), () -> { throw new BusinessException(CoreExceptionEnum.REQUEST_NULL.getCode(), bindingResult.getFieldError().getDefaultMessage()); }); diff --git a/casic-data/src/main/java/com/casic/missiles/modular/system/service/IBusPatrolLogService.java b/casic-data/src/main/java/com/casic/missiles/modular/system/service/IBusPatrolLogService.java index 8d25b9b..ac49360 100644 --- a/casic-data/src/main/java/com/casic/missiles/modular/system/service/IBusPatrolLogService.java +++ b/casic-data/src/main/java/com/casic/missiles/modular/system/service/IBusPatrolLogService.java @@ -25,7 +25,7 @@ */ public interface IBusPatrolLogService extends IService { - Page patrolLogListPage(Page< JSONObject> page, Long monitorId, String startTime, String endTime) throws IOException; + Page patrolLogListPage(Page< JSONObject> page, Long monitorId, String startTime, String endTime) throws Exception; void export(BusPatrolLogDTO busPatrolLogDTO, HttpServletResponse response); diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java index 5dbe030..320382b 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchQuery.java @@ -9,6 +9,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.ListIterator; /** * ES查询请求体 @@ -144,7 +145,7 @@ } /** - * 条件关闭口,将bool中的元素添加到must集合中 + * 条件关闭口,将bool中的元素添加到must集合中 */ public void endShould() { this.filterBoolMust.add(this.boolBuild); @@ -201,7 +202,6 @@ } - public void match(String filed, Serializable value) { JSONArray boolMust = this.boolMust; JSONObject match = new JSONObject(); @@ -258,6 +258,31 @@ this.range(filed, from, to, MUST_FLAG); } + public void removeRange() { + String key = "range"; + if (this.filterBoolMust != null) { + ListIterator integerListIterator = this.filterBoolMust.listIterator(); + while (integerListIterator.hasNext()){ + Object next = integerListIterator.next(); + if (((JSONObject) next).containsKey(key)) { + ((JSONObject) next).remove(key); + integerListIterator.remove(); + } + } + } + if (this.filterBoolMustnot != null) { + ListIterator integerListIterator = this.filterBoolMustnot.listIterator(); + while (integerListIterator.hasNext()){ + Object next = integerListIterator.next(); + if (((JSONObject) next).containsKey(key)) { + ((JSONObject) next).remove(key); + integerListIterator.remove(); + } + } + } + + } + /** * 范围条件是否必须 */ @@ -287,7 +312,7 @@ public void from(int from) { this.body.put("from", from); - if(from>10000){ + if (from > 10000) { this.body.put("query", "match_all"); } } diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java index 4bbd046..6cd932a 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java @@ -1,11 +1,13 @@ package com.casic.missiles.es; +import cn.hutool.core.stream.CollectorUtil; import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; -import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; +import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.casic.missiles.dto.EsResponse; +import org.apache.commons.lang3.StringUtils; import org.apache.http.nio.entity.NStringEntity; import org.apache.http.util.EntityUtils; import org.apache.poi.ss.formula.functions.T; @@ -18,12 +20,14 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; import javax.annotation.PostConstruct; import java.io.IOException; -import java.util.ArrayList; -import java.util.Base64; -import java.util.List; +import java.text.SimpleDateFormat; +import java.time.LocalTime; +import java.time.temporal.ChronoUnit; +import java.util.*; /** * ES工具类 @@ -137,19 +141,123 @@ * 1、根据条件查询返回分页,小于1w条最直接返回 * 2、超过1w条,通过回滚框分情况讨论 */ - public static Page searchScrollQueryPage(Page page, String index, String type, ElasticSearchQuery query) throws IOException { + public static Page searchScrollQueryPage(Page page, String index, String type, String startTime, String endTime, ElasticSearchQuery query) throws Exception { Long currentSize = page.getSize() * page.getCurrent(); if (currentSize < 10000) { + query.range("logTime", startTime, endTime); return searchQueryPage(page, index, type, query); } else { - query.size(10000L); - return searchQueryScrollPage(page, index, type, query); + return budgetQueryPage(page, index, type, startTime, endTime, query); } } /** + * 1、通过总条数/3600计算小时数 + * 2、获取第一条数据的时间差,然后再次进行计算 + * 3、预算查询,直到获取10000条内的数据,这样效率会远高于窗口滚动查询 + */ + private static Page budgetQueryPage(Page page, String index, String type, String startTime, String endTime, ElasticSearchQuery query) throws Exception { + String method = "POST"; + String entPoint = index.concat("/").concat(type).concat("/").concat("_search"); + JSONObject jsonObject = getOffsetTime(page, startTime, endTime, entPoint, query); + if (ObjectUtils.isEmpty(jsonObject)) { + return page; + } + query.removeRange(); + query.range("logTime", startTime, (String) jsonObject.get("time")); + query.from(Integer.valueOf(String.valueOf(jsonObject.get("from")))); + Response response = performRequest(method, entPoint, query.getBody().toString()); + JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + EsResponse esResponse = result.toJavaObject(EsResponse.class); + List results = esResponse.getDatas(); + page.setRecords(results); + page.setTotal(Long.valueOf(esResponse.getHits().getTotal())); + return page; + } + + private static JSONObject getOffsetTime(Page page, String startTime, String endTime, String entPoint, ElasticSearchQuery query) throws Exception { + Long currentSize = page.getSize() * page.getCurrent(); + String newEndTime = ""; + Long offsetHour = currentSize / 3600; + newEndTime = getOffsetTime(endTime, -Integer.valueOf(String.valueOf(offsetHour))); + while (true) { + EsResponse leftTotal = mergerQueryTime(startTime, newEndTime, query, entPoint); + Integer rightTotal = mergerQuery(newEndTime, endTime, query, entPoint); + if (currentSize - rightTotal < 10000&¤tSize - rightTotal >0) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", newEndTime); + jsonObject.put("from", currentSize - rightTotal); + return jsonObject; + } + //最近是空数据 + if (rightTotal == 0 || currentSize - rightTotal > 0) { + //重新偏移坐标,计算坐标 + currentSize -= rightTotal; + if (CollectionUtils.isEmpty(leftTotal.getDatas()) || Long.valueOf(leftTotal.getHits().getTotal()) < 10000) { + return null; + } + newEndTime = (String) ((JSONObject) leftTotal.getDatas().get(0)).get("logTime"); + endTime = newEndTime; + newEndTime = getOffsetTime(endTime, -Integer.valueOf(String.valueOf(offsetHour))); + } else if (currentSize - rightTotal < 0) { + double multiple = (double) rightTotal / currentSize; + offsetHour = (long) (offsetHour / multiple); + newEndTime = getOffsetTime(endTime, -Integer.valueOf(String.valueOf(offsetHour))); + } + if (currentSize > Long.valueOf(leftTotal.getHits().getTotal())) { + return null; + } + } + } + + + private static EsResponse mergerQueryTime(String beginTime, String endTime, ElasticSearchQuery query, String entPoint) throws Exception { + String method = "POST"; + query.removeRange(); + query.range("logTime", beginTime, endTime); + logger.debug(query.getBody().toString()); + Response response = performRequest(method, entPoint, query.getBody().toString()); + // 获取response body,转换为json对象 + JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + EsResponse esResponse = result.toJavaObject(EsResponse.class); + return esResponse; + } + + private static Integer mergerQuery(String beginTime, String endTime, ElasticSearchQuery query, String entPoint) throws Exception { + String method = "POST"; + query.removeRange(); + query.range("logTime", beginTime, endTime); + Response response = performRequest(method, entPoint, query.getBody().toString()); + // 获取response body,转换为json对象 + JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + EsResponse esResponse = result.toJavaObject(EsResponse.class); + int total = Integer.valueOf(esResponse.getHits().getTotal()); + return total; + } + + /** + * 获取偏移时间 + * + * @param beginTime + * @param offsetHour + * @return + * @throws Exception + */ + private static String getOffsetTime(String beginTime, Integer offsetHour) throws Exception { + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + if (StringUtils.isEmpty(beginTime)) { + beginTime = format.format(new Date()); + } + //获取上一条最后的保存时间与当前时间进行相减,与保存间隔相比较 + Calendar calendar = Calendar.getInstance(); + calendar.setTime(format.parse(beginTime)); + calendar.add(Calendar.HOUR_OF_DAY, offsetHour); + return format.format(calendar.getTime()); + } + + /** * 1w条以上的通过滑动窗口实现 - * 1、 + * 太慢了,丢弃 * * @param index * @param type @@ -158,37 +266,46 @@ * @return * @throws IOException */ - public static Page searchQueryScrollPage(Page page, String index, String type, ElasticSearchQuery query) throws IOException { + public static Page searchQueryScrollPage(Page page, String index, String type, ElasticSearchQuery query) throws + IOException { String method = "POST"; List results = new ArrayList<>(); - Long currentSize = page.getSize() * page.getCurrent(); + query.size(10000L); + Long currentSize = page.getSize() * page.getCurrent() + page.getSize(); String entPoint = index.concat("/").concat(type).concat("/").concat("_search").concat("?scroll=1m"); logger.debug(query.getBody().toString()); Response response = performRequest(method, entPoint, query.getBody().toString()); JSONObject result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); EsResponse esResponse = result.toJavaObject(EsResponse.class); page.setTotal(Long.valueOf(esResponse.getHits().getTotal())); + if (currentSize >= page.getTotal()) { + return page; + } + currentSize -= 10000; String scrollId = result.getString("_scroll_id"); entPoint = "_search/scroll"; JSONObject scrollBody = new JSONObject(); scrollBody.put("scroll", "1m"); scrollBody.put("scroll_id", scrollId); - while (esResponse.getDatas().size() > 0 && currentSize > 10000) { - method = "GET"; + method = "GET"; + Long tempSize = currentSize > 10000 ? 10000 : currentSize; + while (esResponse.getDatas().size() > 0 && currentSize > 0) { + query.size(tempSize); response = performRequest(method, entPoint, scrollBody.toString()); - result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); - esResponse = result.toJavaObject(EsResponse.class); - if (ObjectUtil.isNotEmpty(esResponse.getDatas())) { - results = esResponse.getDatas(); - } - currentSize -= 10000; + currentSize -= currentSize > 10000 ? 10000 : currentSize; + } + result = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); + esResponse = result.toJavaObject(EsResponse.class); + if (ObjectUtil.isNotEmpty(esResponse.getDatas())) { + results = esResponse.getDatas(); } // 查询完成后,及时删除scroll,释放资源 scrollBody.remove("scroll"); method = "DELETE"; performRequest(method, entPoint, scrollBody.toString()); logger.debug(query.getBody().toString()); - results = results.subList(Integer.valueOf(String.valueOf(currentSize)), Integer.valueOf(String.valueOf(currentSize + page.getSize()))); + Long beginIndex = (page.getSize() * page.getCurrent()) / 10000; + results = results.subList(Integer.valueOf(String.valueOf(beginIndex)), Integer.valueOf(String.valueOf(beginIndex + page.getSize()))); page.setRecords(results); return page; } @@ -196,7 +313,8 @@ /** * 根据条件进行滚动查询 */ - public static List searchQueryScroll(String index, String type, ElasticSearchQuery query) throws IOException { + public static List searchQueryScroll(String index, String type, ElasticSearchQuery query) throws + IOException { // 首次查询,提交查询条件,endpoint增加‘?scroll=1m’ List results = new ArrayList<>(); String method = "POST"; @@ -258,7 +376,6 @@ if (ObjectUtil.isNotEmpty(entity)) { request.setEntity(new NStringEntity(entity, "utf-8")); } - if (ObjectUtil.isNotEmpty(username) && ObjectUtil.isNotEmpty(password)) { RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); String token = username.concat(":").concat(password); diff --git a/casic-data/src/main/java/com/casic/missiles/modular/system/controller/BusPatrolLogController.java b/casic-data/src/main/java/com/casic/missiles/modular/system/controller/BusPatrolLogController.java index a0a18d5..12f269c 100644 --- a/casic-data/src/main/java/com/casic/missiles/modular/system/controller/BusPatrolLogController.java +++ b/casic-data/src/main/java/com/casic/missiles/modular/system/controller/BusPatrolLogController.java @@ -41,7 +41,7 @@ * 获取巡检日志分页列表 */ @RequestMapping(value = "/listPage") - public Object listPage(@RequestBody @Valid BusPatrolLogDTO busPatrolLogDTO, BindingResult bindingResult) throws IOException { + public Object listPage(@RequestBody @Valid BusPatrolLogDTO busPatrolLogDTO, BindingResult bindingResult) throws Exception { Assert.isFalse(bindingResult.hasErrors(), () -> { throw new BusinessException(CoreExceptionEnum.REQUEST_NULL.getCode(), bindingResult.getFieldError().getDefaultMessage()); }); diff --git a/casic-data/src/main/java/com/casic/missiles/modular/system/service/IBusPatrolLogService.java b/casic-data/src/main/java/com/casic/missiles/modular/system/service/IBusPatrolLogService.java index 8d25b9b..ac49360 100644 --- a/casic-data/src/main/java/com/casic/missiles/modular/system/service/IBusPatrolLogService.java +++ b/casic-data/src/main/java/com/casic/missiles/modular/system/service/IBusPatrolLogService.java @@ -25,7 +25,7 @@ */ public interface IBusPatrolLogService extends IService { - Page patrolLogListPage(Page< JSONObject> page, Long monitorId, String startTime, String endTime) throws IOException; + Page patrolLogListPage(Page< JSONObject> page, Long monitorId, String startTime, String endTime) throws Exception; void export(BusPatrolLogDTO busPatrolLogDTO, HttpServletResponse response); diff --git a/casic-data/src/main/java/com/casic/missiles/modular/system/service/impl/BusPatrolLogServiceImpl.java b/casic-data/src/main/java/com/casic/missiles/modular/system/service/impl/BusPatrolLogServiceImpl.java index a251a90..a5a19c1 100644 --- a/casic-data/src/main/java/com/casic/missiles/modular/system/service/impl/BusPatrolLogServiceImpl.java +++ b/casic-data/src/main/java/com/casic/missiles/modular/system/service/impl/BusPatrolLogServiceImpl.java @@ -52,11 +52,11 @@ private final IBaseExportService iBaseExportService; @Override - public Page patrolLogListPage(Page page, Long monitorId, String startTime, String endTime) throws IOException { + public Page patrolLogListPage(Page page, Long monitorId, String startTime, String endTime) throws Exception { ElasticSearchQuery elasticSearchQuery = initParam(monitorId, startTime, endTime); elasticSearchQuery.size(page.getSize()); // elasticSearchQuery.from(Integer.valueOf(String.valueOf(page.getCurrent()*page.getSize()))); - ElasticSearchUtil.searchScrollQueryPage(page,DataConst.DATA_GAS_ES_INDEX, DataConst.DATA_GAS_ES_TYPE, elasticSearchQuery); + ElasticSearchUtil.searchScrollQueryPage(page, DataConst.DATA_GAS_ES_INDEX, DataConst.DATA_GAS_ES_TYPE, startTime, endTime, elasticSearchQuery); // searchQueryPage(page, DataConst.DATA_GAS_ES_INDEX, DataConst.DATA_GAS_ES_TYPE, elasticSearchQuery); return page; } @@ -105,7 +105,6 @@ } elasticSearchQuery.endShould(); elasticSearchQuery.term("monitorId", monitorId); - elasticSearchQuery.range("logTime", startTime, endTime); elasticSearchQuery.sort("logTime", "DESC"); return elasticSearchQuery; }