Newer
Older
pgdsc / src / com / szpg / plc / server / FINSCommandPool.java
admin on 12 Jan 2018 5 KB 2018-01-12
package com.szpg.plc.server;

import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;

import com.szpg.util.Configure;

import z.json.JSONException;
import z.json.JSONObject;

public class FINSCommandPool {

	private Map<String, List<JSONObject>> messageToSend; //发送命令池
	private Map<String, JSONObject> messageToResponse; //响应消息池
	
	private final int VALID_MIN = Integer.parseInt(Configure.getProperty("sys", "POOL.VALID_TIME", "5"));
	//刷新时间
	private final int FRESH_MIN	= Integer.parseInt(Configure.getProperty("sys", "POOL.POOL.REFRESH_TIME", "5"));
	
	public FINSCommandPool() {
		messageToSend = new HashMap<String, List<JSONObject>>();
		messageToResponse = new HashMap<String, JSONObject>();
	}

	private final Logger logger = Logger.getLogger(this.getClass().getName()); //输出到message
	
	public Map<String, JSONObject> getMessageToResponse() {
		return messageToResponse;
	}
	/**
	 * 在发送命令池中添加新的消息(JSON对象形式)
	 * @param messageStr json格式的字符串
	 * @throws JSONException
	 */
	public synchronized void pushMessage(String messageStr) throws JSONException {
		JSONObject jMessage = new JSONObject(messageStr);
		
		String sessionId = jMessage.getString("sessionId");
		String rtuAddress = jMessage.getString("rtu");
		String type = jMessage.getString("type");
		String time = String.valueOf(Calendar.getInstance().getTimeInMillis());
		
		if (messageToSend.containsKey(sessionId) && messageToSend.get(sessionId).isEmpty() == false) {
			List<JSONObject> list = messageToSend.get(sessionId);
			
			JSONObject item = new JSONObject();
			item.put("rtu", rtuAddress);
			item.put("type", type);
			item.put("timestamp", time);
			
			list.add(item);
			
			// 更新map
			messageToSend.put(sessionId, list);
		} else {
			List<JSONObject> list = new ArrayList<JSONObject>();
			
			JSONObject item = new JSONObject();
			item.put("rtu", rtuAddress);
			item.put("type", type);
			item.put("timestamp", time);
			
			list.add(item);
			
			messageToSend.put(sessionId, list);
		}
	}
	/**
	 * 匹配响应的消息字符串(json形式),若找到,则压入响应消息池中,并将发送命令池中相应的消息删除,证明此次请求全部结束
	 * @param responseStr 响应字符串
	 * @throws JSONException
	 */
	public synchronized void match(String responseStr) throws JSONException {
		JSONObject jResponse = new JSONObject(responseStr);
		
		String sessionId = jResponse.getString("sessionId");
		String rtuAddress = jResponse.getString("rtu");
		String type = jResponse.getString("type");
		jResponse.put("timestamp", Calendar.getInstance().getTimeInMillis());
		
		if (messageToSend.containsKey(sessionId) && messageToSend.get(sessionId).isEmpty() == false) {
			List<JSONObject> list = messageToSend.get(sessionId);
			for (int i = list.size(); i > 0; i--) {
				JSONObject message = list.get(i - 1); //从命令队列尾部开始遍历
				if (message.getString("rtu").equals(rtuAddress)) {
					if (message.getString("type").equalsIgnoreCase(type) == false) {
						continue;
					} else {
						//找到匹配的消息
						//1将响应消息压入响应消息池
						messageToResponse.put(sessionId + "-" + rtuAddress + "-" + type, jResponse);
						
						//2删除发送命令池中的对象
						list.remove(i - 1);
						
						break;
					}
				}
			}
		}
	}
	
	//创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期
	//核心线程池的大小为1
	public synchronized void refresh() {
		new ScheduledThreadPoolExecutor(1).scheduleAtFixedRate(new RefreshPoolTask(), 0, FRESH_MIN, TimeUnit.MINUTES);
	}
	//该线程通过被refresh()方法调用,移除发送命令池和响应消息池中的超时消息
	class RefreshPoolTask implements Runnable {
		@Override
		public synchronized void run() {
			Set<String> keySet = messageToSend.keySet();
			for (String key : keySet) {
				List<JSONObject> list = messageToSend.get(key);
				
				for (JSONObject item : list) {
					try {
						long timestamp = Long.parseLong(item.getString("timestamp"));
						
						// 超时移除命令
						if (Calendar.getInstance().getTimeInMillis() - timestamp > VALID_MIN * 1000 * 60) {
							list.remove(item);
							logger.debug("移除该超时命令:" + item.toString());
						}
					} catch (NumberFormatException e) {
						logger.error("定时整理消息队列时时间戳转换为整数异常", e);
					} catch (JSONException e) {
						logger.error("定时整理消息队列时获取时间戳异常", e);
					}
				}
				
				// 若队列为空则清除
				if (list.isEmpty() == true) {
					messageToSend.remove(key);
				}
			}
			
			Set<String> resKeySet = messageToResponse.keySet();
			for (String key : resKeySet) {
				JSONObject item = messageToResponse.get(key);
				
				try {
					long timestamp = Long.parseLong(item.getString("timestamp"));
					
					// 超时移除命令
					if (Calendar.getInstance().getTimeInMillis() - timestamp > VALID_MIN * 1000 * 60) {
						messageToResponse.remove(key);
						logger.debug("移除超时响应:" + key);
					}
				} catch (NumberFormatException e) {
					logger.error("定时整理响应队列时时间戳转换为整数异常", e);
				} catch (JSONException e) {
					logger.error("定时整理响应队列时获取时间戳异常", e);
				}
			}
		}
		
	}
}