package com.szpg.plc.server; import java.util.ArrayList; import java.util.Calendar; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import com.szpg.util.Configure; import z.json.JSONException; import z.json.JSONObject; public class FINSCommandPool { private Map<String, List<JSONObject>> messageToSend; //发送命令池 private Map<String, JSONObject> messageToResponse; //响应消息池 private final int VALID_MIN = Integer.parseInt(Configure.getProperty("sys", "POOL.VALID_TIME", "5")); //刷新时间 private final int FRESH_MIN = Integer.parseInt(Configure.getProperty("sys", "POOL.POOL.REFRESH_TIME", "5")); public FINSCommandPool() { messageToSend = new HashMap<String, List<JSONObject>>(); messageToResponse = new HashMap<String, JSONObject>(); } private final Logger logger = Logger.getLogger(this.getClass().getName()); //输出到message public Map<String, JSONObject> getMessageToResponse() { return messageToResponse; } /** * 在发送命令池中添加新的消息(JSON对象形式) * @param messageStr json格式的字符串 * @throws JSONException */ public synchronized void pushMessage(String messageStr) throws JSONException { JSONObject jMessage = new JSONObject(messageStr); String sessionId = jMessage.getString("sessionId"); String rtuAddress = jMessage.getString("rtu"); String type = jMessage.getString("type"); String time = String.valueOf(Calendar.getInstance().getTimeInMillis()); if (messageToSend.containsKey(sessionId) && messageToSend.get(sessionId).isEmpty() == false) { List<JSONObject> list = messageToSend.get(sessionId); JSONObject item = new JSONObject(); item.put("rtu", rtuAddress); item.put("type", type); item.put("timestamp", time); list.add(item); // 更新map messageToSend.put(sessionId, list); } else { List<JSONObject> list = new ArrayList<JSONObject>(); JSONObject item = new JSONObject(); item.put("rtu", rtuAddress); item.put("type", type); item.put("timestamp", time); list.add(item); messageToSend.put(sessionId, list); } } /** * 匹配响应的消息字符串(json形式),若找到,则压入响应消息池中,并将发送命令池中相应的消息删除,证明此次请求全部结束 * @param responseStr 响应字符串 * @throws JSONException */ public synchronized void match(String responseStr) throws JSONException { JSONObject jResponse = new JSONObject(responseStr); String sessionId = jResponse.getString("sessionId"); String rtuAddress = jResponse.getString("rtu"); String type = jResponse.getString("type"); jResponse.put("timestamp", Calendar.getInstance().getTimeInMillis()); if (messageToSend.containsKey(sessionId) && messageToSend.get(sessionId).isEmpty() == false) { List<JSONObject> list = messageToSend.get(sessionId); for (int i = list.size(); i > 0; i--) { JSONObject message = list.get(i - 1); //从命令队列尾部开始遍历 if (message.getString("rtu").equals(rtuAddress)) { if (message.getString("type").equalsIgnoreCase(type) == false) { continue; } else { //找到匹配的消息 //1将响应消息压入响应消息池 messageToResponse.put(sessionId + "-" + rtuAddress + "-" + type, jResponse); //2删除发送命令池中的对象 list.remove(i - 1); break; } } } } } //创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期 //核心线程池的大小为1 public synchronized void refresh() { new ScheduledThreadPoolExecutor(1).scheduleAtFixedRate(new RefreshPoolTask(), 0, FRESH_MIN, TimeUnit.MINUTES); } //该线程通过被refresh()方法调用,移除发送命令池和响应消息池中的超时消息 class RefreshPoolTask implements Runnable { @Override public synchronized void run() { Set<String> keySet = messageToSend.keySet(); for (String key : keySet) { List<JSONObject> list = messageToSend.get(key); for (JSONObject item : list) { try { long timestamp = Long.parseLong(item.getString("timestamp")); // 超时移除命令 if (Calendar.getInstance().getTimeInMillis() - timestamp > VALID_MIN * 1000 * 60) { list.remove(item); logger.debug("移除该超时命令:" + item.toString()); } } catch (NumberFormatException e) { logger.error("定时整理消息队列时时间戳转换为整数异常", e); } catch (JSONException e) { logger.error("定时整理消息队列时获取时间戳异常", e); } } // 若队列为空则清除 if (list.isEmpty() == true) { messageToSend.remove(key); } } Set<String> resKeySet = messageToResponse.keySet(); for (String key : resKeySet) { JSONObject item = messageToResponse.get(key); try { long timestamp = Long.parseLong(item.getString("timestamp")); // 超时移除命令 if (Calendar.getInstance().getTimeInMillis() - timestamp > VALID_MIN * 1000 * 60) { messageToResponse.remove(key); logger.debug("移除超时响应:" + key); } } catch (NumberFormatException e) { logger.error("定时整理响应队列时时间戳转换为整数异常", e); } catch (JSONException e) { logger.error("定时整理响应队列时获取时间戳异常", e); } } } } }