using SensorHub.Servers; using SensorHub.Servers.Commands.CASICCommands; using SensorHub.Servers.JsonFormat; using SensorHub.Servers.SM4; using SuperSocket.SocketBase.Command; using SuperSocket.SocketBase.Protocol; using System; using System.Collections.Generic; using System.IO; using System.Text; using System.Threading; using System.Configuration; using SuperSocket.SocketBase; using System.Reflection; namespace SensorHub.Noise { public class Noise : CommandBase<CasicSession, StringRequestInfo> { public override void ExecuteCommand(CasicSession session, StringRequestInfo requestInfo) { //TODO: construct the receving casic data String preamble = requestInfo.Parameters[0]; String version = requestInfo.Parameters[1]; String leng = requestInfo.Parameters[2]; String devCode = requestInfo.Parameters[3]; String routeFlag = requestInfo.Parameters[4]; String dstNodeAddr = requestInfo.Parameters[5]; String pduType = requestInfo.Parameters[6]; String seq = requestInfo.Parameters[7]; String settings = requestInfo.Parameters[8]; String source = requestInfo.Parameters[9]; String devName = "Noise"; if (source.Contains("-")) { session.Send("HTTP/1.1 200 OK\r\n\r\n\r\n"); session.Close(); } String devType = "噪声记录仪"; String operType = Common.getOpeTypeByPdu(pduType); // session.Logger.Info("AD接收数据:" + requestInfo.Body); session.Logger.Info("设备编号:" + devCode); session.Logger.Info("设备类型:" + devType); session.Logger.Info("操作类型:" + operType); session.Logger.Info("包序列号:" + seq); session.Logger.Info("会话:" + session.HubAddr + "," + session.SessionID); //判断是返回的设置确认数据帧, 回复第三方 if (operType == "SetResponse") { Common.sendSetResponse(session, devCode, devName); return; } List<Tag> tags = Common.getTags(settings, session); //具体业务处理 String collectDate = ""; int cell = -1; int? pci = null; int? rsrp = null; int? snr = null; String softwareVersion = ""; uint offset = 0; uint size = 0; List<String> eventList = new List<String>(); List<DatasJson> datasList = new List<DatasJson>(); List<DatasJson> corrDataList = new List<DatasJson>(); List<String> startupList = new List<String>(); foreach (Tag tag in tags) { if (!(tag is UploadTag)) { if (tag != null && tag is CellTag) { CellTag cellTag = (CellTag)tag; cell = cellTag.Cell; session.Logger.Info("设备[" + devCode + "]电量Tag: " + cell); continue; } if (tag != null && tag is PCITag) { PCITag pciTag = (PCITag)tag; pci = pciTag.PCI; session.Logger.Info("设备[" + devCode + "]信号强度PCITag: " + pci); continue; } if (tag != null && tag is RSRPTag) { RSRPTag rsrpTag = (RSRPTag)tag; rsrp = rsrpTag.RSRP; continue; } if (tag != null && tag is SNRTag) { SNRTag snrTag = (SNRTag)tag; snr = snrTag.SNR; continue; } //非业务处理 if (tag != null && tag is SystemDateTag) { SystemDateTag systemDateTag = (SystemDateTag)tag; collectDate = systemDateTag.CollectDate; session.Logger.Info("设备[" + devCode + "]采集日期Tag: " + collectDate); continue; } if (tag != null && tag is SensorException0Tag) { SensorException0Tag sensorException0 = tag as SensorException0Tag; int state = sensorException0.state; if (state == 0) continue; eventList.Add(getNoiseAlarm(state)); session.Logger.Info("通道一发送容错信息:oid:" + tag.Oid + ";value:" + state); continue; } if (tag != null && tag is SensorException1Tag) { SensorException1Tag sensorException1 = tag as SensorException1Tag; int state = sensorException1.state; if (state == 0) continue; eventList.Add(getNoiseAlarm(state)); session.Logger.Info("通道二发送容错信息:oid:" + tag.Oid + ";value:" + state); continue; } if (tag != null && tag is SensorStartupTag) { SensorStartupTag sensorStartup = tag as SensorStartupTag; String imei = sensorStartup.IMEI; String iccid = sensorStartup.ICCID; startupList.Add(imei); startupList.Add(iccid); session.Logger.Info("设备开机上报,设备编号DEVCODE:" + devCode + " IMEI:" + imei + " ICCID:" + iccid); continue; } //非业务处理 if (tag != null && tag is SoftwareVersionTag) { SoftwareVersionTag versionTag = (SoftwareVersionTag)tag; softwareVersion = versionTag.Version; session.Logger.Info("设备[" + devCode + "]请求远程升级:" + softwareVersion); continue; } if (tag != null && tag is OffsetTag) { OffsetTag offsetTag = (OffsetTag)tag; offset = offsetTag.Offset; continue; } if (tag != null && tag is SizeTag) { SizeTag sizeTag = (SizeTag)tag; size = sizeTag.Size; continue; } } else { //业务处理 UploadTag uploadTag = tag as UploadTag; switch (uploadTag.BizType) { case 4: // 噪声 TagHandler noiseHandler = new NoiseTagHandler(); noiseHandler.resolve(tag, session); DateTime baseTime = Convert.ToDateTime(collectDate + " " + noiseHandler.CollecTime); for (int i = 0; i < noiseHandler.DataList.Count / 2; i++) { DateTime upTime = baseTime.AddMinutes(i * noiseHandler.Interval); String uptime = upTime.ToString("yyyy") + upTime.ToString("MM") + upTime.ToString("dd") + upTime.ToString("HH") + upTime.ToString("mm") + upTime.ToString("ss"); datasList.Add(new NoiseDatasJson(uptime, (double)noiseHandler.DataList[i * 2], (int)noiseHandler.DataList[i * 2 + 1])); } break; case 21: // 海量加速度值 TagHandler accelerationHandler = new AccelerationTagHandler(); accelerationHandler.resolve(tag, session); // 十进制的包序号 int iSeq = Convert.ToInt32(seq, 16) - 1; if (accelerationHandler.Data != null) { if (!AccelerationDatas.accelerationDataFinishMap.ContainsKey(devCode)) { AccelerationDatas.accelerationDataFinishMap.Add(devCode, false); } if (AccelerationDatas.accelerationDataSeqMap.ContainsKey(devCode)) { if (iSeq < 32) AccelerationDatas.accelerationDataHighSeqMap[devCode] |= (UInt32)(1 << 31 - iSeq); else AccelerationDatas.accelerationDataLowSeqMap[devCode] |= (UInt32)(1 << 63 - iSeq); AccelerationDatas.accelerationDataSeqMap[devCode] = ((UInt64)AccelerationDatas.accelerationDataHighSeqMap[devCode] << 32) + AccelerationDatas.accelerationDataLowSeqMap[devCode]; } else { if (iSeq < 32) { AccelerationDatas.accelerationDataHighSeqMap.Add(devCode, 0x0 | (UInt32)(1 << 31 - iSeq)); AccelerationDatas.accelerationDataLowSeqMap.Add(devCode, 0x0000007F); } else { AccelerationDatas.accelerationDataHighSeqMap.Add(devCode, 0x0); AccelerationDatas.accelerationDataLowSeqMap.Add(devCode, 0x0000007F | (UInt32)(1 << 63 - iSeq)); } AccelerationDatas.accelerationDataSeqMap.Add(devCode, ((UInt64)AccelerationDatas.accelerationDataHighSeqMap[devCode] << 32) + AccelerationDatas.accelerationDataLowSeqMap[devCode]);// 初始值64位 有效位57位 0-6位为无效位 } session.Logger.Info("[" + devCode + "]已经缓存的数据序列包:" + AccelerationDatas.accelerationDataSeqMap[devCode].ToString("X") + ": " + AccelerationDatas.accelerationDataHighSeqMap[devCode].ToString("X") + "; " + AccelerationDatas.accelerationDataLowSeqMap[devCode].ToString("X")); string data = accelerationHandler.Data as string; if (AccelerationDatas.accelerationDataValueMap.ContainsKey(devCode)) { AccelerationDatas.accelerationDataValueMap[devCode][iSeq] = data; } else { string[] datas = new string[57]; // 每包200个字节 一共分成57个包上传 datas[iSeq] = data; AccelerationDatas.accelerationDataValueMap.Add(devCode, datas); } if (IsAccelerationDatafinished(pduType)) { byte[] responseFrame = buildAccelerationDataReceived(devCode, new byte[2] { 0x05, 0x82 }); string responseBase64 = Convert.ToBase64String(responseFrame); // 转成base64下发 if (AccelerationDatas.accelerationDataSeqMap[devCode] == 0xFFFFFFFFFFFFFFFF) // 判断数据包是否完整接收 { string datas = ""; foreach (var d in AccelerationDatas.accelerationDataValueMap[devCode]) { datas += d; } session.Logger.Info("接收完毕,总数据长度:" + datas.Length / 2); // session.Logger.Info(datas); // 各采样点的噪声值 double[] realValue = new double[datas.Length / 6]; // 噪声值总计 double sumNoise = 0.0; for (int i = 0; i < realValue.Length; i++) { string tripleByteString = datas.Substring(i * 6, 6); byte[] triple = StrToHexByte(tripleByteString); realValue[i] = HexToDouble(triple); sumNoise += Math.Abs(realValue[i]); // session.Logger.Info(i + ": " + tripleByteString + "; " + HexToDouble(triple)); } AccelerationDatas.acclerationValueMap.Add(devCode, realValue); // 向第三方发送数据(批产应用) string uptime = DateTime.Now.ToString("yyyyMMddHHmmss"); NoiseDatasJson noiData = new NoiseDatasJson(uptime, sumNoise * 2, 0); datasList.Add(noiData); // 采样率为7500的一半 总噪声值×2 Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); // 临时方案:向kafka发送数据(第三方集成) Common.kafkaProduce(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); // 临时方案二,向相关仪应用的kafka队列发送包含原始采样点值的数据 noiData.noiseSampleValues = realValue; corrDataList.Add(noiData); Common.CorrKafkaProduce(session, devName, devCode, cell, pci, rsrp, snr, corrDataList); // 创建存盘线程 Thread savethread = new Thread(SavetoFlie) { IsBackground = true//将线程设置为后台线程 }; savethread.Start(devCode); //开启存盘线程 // 处理完毕 清除缓存 AccelerationDatas.accelerationDataFinishMap[devCode] = true; AccelerationDatas.accelerationDataSeqMap.Remove(devCode); AccelerationDatas.accelerationDataHighSeqMap.Remove(devCode); AccelerationDatas.accelerationDataLowSeqMap.Remove(devCode); AccelerationDatas.accelerationDataValueMap.Remove(devCode); AccelerationDatas.accelerationDataFinishMap.Remove(devCode); } // 发送响应 Common.SendAEPCommand(session, responseBase64, source); } return; } break; default: session.Logger.Info("未知业务类型!"); break; } } } // Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); // Common.kafkaProduce(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); byte[] btPdu = new byte[2]; //2个字节 btPdu[0] = Common.getRespOperType(operType, source == "433" ? true : false); btPdu[1] = 0x82; if (softwareVersion != "" || size != 0 || offset != 0)//进入远程升级流程 { session.Logger.Info("进入远程升级流程[" + devName + " " + devCode + " " + softwareVersion + "]"); Common.remoteUpgrade(session, operType, devName, devCode, btPdu, softwareVersion, size, offset, source); // return; } // 回复校时信息 Common.sendConfig(session, devCode, routeFlag, source, btPdu); } private bool IsAccelerationDatafinished(String pduType) { Int16 btpduType = Int16.Parse(pduType, System.Globalization.NumberStyles.HexNumber); int flag = btpduType & 0x80; return flag == 128 ? true : false; } private String getNoiseAlarm(int state) { switch (state) { case 0: return "NoiseNormal";//正常 case 1: return "NoiseFail"; //采集失败 case 2: return "NoiseError"; //数据异常 default: return "NoiseUnknown"; //未知异常 } } private byte[] buildAccelerationDataReceived(string devCode, byte[] btPdu) { byte[] frame = { 0xA3, 0x20, //帧头 - 固定 0x00, 0x26, //长度 - 固定 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, //设备编号 0x03, //通信方式 - 固定 0x00, 0x00, //目标节点地址 0x00, 0x00, //PDUType 0x01, //Seq - 不分包 固定 0x10, 0x00, 0x00, 0x51, 0x00, 0x06, //时间戳 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x60, 0x00, 0x03, 0x00, 0x00, 0x08, //数据接收状态 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // 设备编号字段 byte[] btDevCode = StrToHexByte(devCode); btDevCode.CopyTo(frame, 4); Array.Copy(btDevCode, 4, frame, 11, 2); // 目标节点地址 = 设备编号最后2个字节 // pduType字段 btPdu.CopyTo(frame, 13); // 时间戳字段 SystemTimeConfig sysTimeConfig = new SystemTimeConfig(null); byte[] btConfig = sysTimeConfig.getConfig(new byte[0]); btConfig.CopyTo(frame, 16); // 数据接收状态字段 if (AccelerationDatas.accelerationDataSeqMap.ContainsKey(devCode)) { byte[] value = System.BitConverter.GetBytes(AccelerationDatas.accelerationDataSeqMap[devCode]); Array.Reverse(value, 0, value.Length); value.CopyTo(frame, 34); } // 查询是否有需要下发的配置项 byte[] configBytes = Common.BuildConfigBytes(devCode); if (configBytes.Length > 0) { Array.Resize(ref frame, frame.Length + configBytes.Length); // 扩展长度 configBytes.CopyTo(frame, 42); // 在后面加上其他配置项 int length = frame.Length - 4; byte[] btLens = new byte[2]; //数据帧长度 byte[] btlens0 = BitConverter.GetBytes(length); btLens[0] = btlens0[1]; btLens[1] = btlens0[0]; btLens.CopyTo(frame, 2); } // SM4加密 byte[] tag = new byte[frame.Length - 16]; Array.Copy(frame, 16, tag, 0, tag.Length); // 需要加密的内容 byte[] enTag; // 加密后的内容 enTag = SM4Utils.sm4Encrypt(tag, devCode); byte[] result = new byte[1 + 1 + 2 + 6 + 1 + 2 + 2 + 1 + enTag.Length]; Array.Copy(frame, result, 16); enTag.CopyTo(result, 16); byte[] afcrc = Common.CRC(result); return afcrc; } /// <summary> /// 字符串转16进制字节数组 /// </summary> /// <param name="hexString"></param> /// <returns></returns> private byte[] StrToHexByte(string hexString) { hexString = hexString.Replace(" ", ""); if ((hexString.Length % 2) != 0) hexString += "0"; byte[] returnBytes = new byte[hexString.Length / 2]; for (int i = 0; i < returnBytes.Length; i++) returnBytes[i] = Convert.ToByte(hexString.Substring(i * 2, 2), 16); return returnBytes; } private string ByteToHexString(byte[] byteFrame) { StringBuilder builder = new StringBuilder(); for (int i = 0; i < byteFrame.Length; i++) { builder.Append(string.Format("{0:X2}", byteFrame[i])); } return builder.ToString().Trim(); } /// <summary> /// 三字节16进制数组 转 double /// </summary> /// <param name="src"></param> /// <returns></returns> private double HexToDouble(byte[] tripleArray) { if (tripleArray.Length != 3) return 0; short result1 = tripleArray[0]; short result2 = tripleArray[1]; short result3 = tripleArray[2]; if ((result1 & 0x80) == 0x80) { result1 = Convert.ToInt16(result1 - 255); result2 = Convert.ToInt16(result2 - 255); result3 = Convert.ToInt16(result3 - 255); } var data = (result1 * 65536 + result2 * 256 + result3) * 5 / 83.88607 / 100000; return data; } private void SavetoFlie(object obj)//存文件方法 { // 如果有这个文件(参数是存储位置+文件名称,打开方式)打开文件末尾如果没有创建一个文件流的类并加入参数后实例化 string path = Common.GetWindowsServiceInstallPath(ConfigurationManager.AppSettings["ServiceName"]) + "\\Datas\\"; FileStream fileStream = new FileStream(path + DateTime.Now.ToString("yyyyMMddHHmmss-") + obj + ".noi", FileMode.Append); // 创建一个输入流的类并加入参数(文件路径和编码格式)实例化 StreamWriter write = new StreamWriter(fileStream, Encoding.Default); StringBuilder exdata = new StringBuilder();//这里是创建写入数据的StringBuilder对象 try { for (int i = 0; i < AccelerationDatas.acclerationValueMap[(string) obj].Length; i++) { exdata.Append(AccelerationDatas.acclerationValueMap[(string)obj][i]).AppendLine(); //添加StringBuilder字符串内容 } write.Write(exdata); //向目标文件中写入数据 } catch (Exception ex) { Console.WriteLine("存储数据到文件时发生错误:" + ex.Message); } finally { // 关闭文件流 write.Close(); fileStream.Close(); AccelerationDatas.acclerationValueMap.Remove((string)obj); } } } }