diff --git a/SensorHub.Methane/Methane.cs b/SensorHub.Methane/Methane.cs index d5f5f29..d5bf37b 100644 --- a/SensorHub.Methane/Methane.cs +++ b/SensorHub.Methane/Methane.cs @@ -58,9 +58,14 @@ return; } + Mutex mutex = new Mutex(); + mutex.WaitOne(); + //获取电量信息,系统时间,传递给对应的handler List tags = Common.getTags(settings, session); + mutex.ReleaseMutex(); + //具体业务处理 String collectDate = ""; int cell = -1; @@ -76,9 +81,6 @@ List datasList = new List(); List startupList = new List(); - Mutex mutex = new Mutex(); - mutex.WaitOne(); - foreach (Tag tag in tags) { if (!(tag is UploadTag)) @@ -197,15 +199,7 @@ } // 发送到flume或者kafka - if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) - { - Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); - } else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) - { - Common.kafkaProduce(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); - } - - mutex.ReleaseMutex(); + Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); if (softwareVersion != "" || size != 0 || offset != 0)//进入远程升级流程 { diff --git a/SensorHub.Methane/Methane.cs b/SensorHub.Methane/Methane.cs index d5f5f29..d5bf37b 100644 --- a/SensorHub.Methane/Methane.cs +++ b/SensorHub.Methane/Methane.cs @@ -58,9 +58,14 @@ return; } + Mutex mutex = new Mutex(); + mutex.WaitOne(); + //获取电量信息,系统时间,传递给对应的handler List tags = Common.getTags(settings, session); + mutex.ReleaseMutex(); + //具体业务处理 String collectDate = ""; int cell = -1; @@ -76,9 +81,6 @@ List datasList = new List(); List startupList = new List(); - Mutex mutex = new Mutex(); - mutex.WaitOne(); - foreach (Tag tag in tags) { if (!(tag is UploadTag)) @@ -197,15 +199,7 @@ } // 发送到flume或者kafka - if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) - { - Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); - } else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) - { - Common.kafkaProduce(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); - } - - mutex.ReleaseMutex(); + Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); if (softwareVersion != "" || size != 0 || offset != 0)//进入远程升级流程 { diff --git a/SensorHub.Servers/Commands/CASICCommands/SystemDateTag.cs b/SensorHub.Servers/Commands/CASICCommands/SystemDateTag.cs index dae96bc..e73098b 100644 --- a/SensorHub.Servers/Commands/CASICCommands/SystemDateTag.cs +++ b/SensorHub.Servers/Commands/CASICCommands/SystemDateTag.cs @@ -22,9 +22,9 @@ public SystemDateTag(String oid, int len, String dataValue) : base(oid, len, dataValue) { - int year = 2000+Int32.Parse(this.DataValue.Substring(0,2), System.Globalization.NumberStyles.HexNumber); - int month = Int32.Parse(this.DataValue.Substring(2, 2), System.Globalization.NumberStyles.HexNumber); - int day = Int32.Parse(this.DataValue.Substring(4, 2), System.Globalization.NumberStyles.HexNumber); + int year = 2000+Int32.Parse(dataValue.Substring(0,2), System.Globalization.NumberStyles.HexNumber); + int month = Int32.Parse(dataValue.Substring(2, 2), System.Globalization.NumberStyles.HexNumber); + int day = Int32.Parse(dataValue.Substring(4, 2), System.Globalization.NumberStyles.HexNumber); collectDate = year + "-" + month + "-" + day; } diff --git a/SensorHub.Methane/Methane.cs b/SensorHub.Methane/Methane.cs index d5f5f29..d5bf37b 100644 --- a/SensorHub.Methane/Methane.cs +++ b/SensorHub.Methane/Methane.cs @@ -58,9 +58,14 @@ return; } + Mutex mutex = new Mutex(); + mutex.WaitOne(); + //获取电量信息,系统时间,传递给对应的handler List tags = Common.getTags(settings, session); + mutex.ReleaseMutex(); + //具体业务处理 String collectDate = ""; int cell = -1; @@ -76,9 +81,6 @@ List datasList = new List(); List startupList = new List(); - Mutex mutex = new Mutex(); - mutex.WaitOne(); - foreach (Tag tag in tags) { if (!(tag is UploadTag)) @@ -197,15 +199,7 @@ } // 发送到flume或者kafka - if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) - { - Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); - } else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) - { - Common.kafkaProduce(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); - } - - mutex.ReleaseMutex(); + Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); if (softwareVersion != "" || size != 0 || offset != 0)//进入远程升级流程 { diff --git a/SensorHub.Servers/Commands/CASICCommands/SystemDateTag.cs b/SensorHub.Servers/Commands/CASICCommands/SystemDateTag.cs index dae96bc..e73098b 100644 --- a/SensorHub.Servers/Commands/CASICCommands/SystemDateTag.cs +++ b/SensorHub.Servers/Commands/CASICCommands/SystemDateTag.cs @@ -22,9 +22,9 @@ public SystemDateTag(String oid, int len, String dataValue) : base(oid, len, dataValue) { - int year = 2000+Int32.Parse(this.DataValue.Substring(0,2), System.Globalization.NumberStyles.HexNumber); - int month = Int32.Parse(this.DataValue.Substring(2, 2), System.Globalization.NumberStyles.HexNumber); - int day = Int32.Parse(this.DataValue.Substring(4, 2), System.Globalization.NumberStyles.HexNumber); + int year = 2000+Int32.Parse(dataValue.Substring(0,2), System.Globalization.NumberStyles.HexNumber); + int month = Int32.Parse(dataValue.Substring(2, 2), System.Globalization.NumberStyles.HexNumber); + int day = Int32.Parse(dataValue.Substring(4, 2), System.Globalization.NumberStyles.HexNumber); collectDate = year + "-" + month + "-" + day; } diff --git a/SensorHub.Servers/Common.cs b/SensorHub.Servers/Common.cs index 3572282..6fbe9e7 100644 --- a/SensorHub.Servers/Common.cs +++ b/SensorHub.Servers/Common.cs @@ -594,7 +594,9 @@ session.Logger.Info("往kafka写入数据:" + message); } - + /** + * 没有信号强度的字段 + * */ public static void sendMessage(CasicSession session, String devName, String devCode, int cell, List eventList, List datasList, List startupList) { DateTime now = DateTime.Now; @@ -612,13 +614,21 @@ String message = JsonConvert.SerializeObject(new Json("Data", devName, devCode, new DataJson(devName + "Data", cell, datasList, logtime), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送数据:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } } - else + else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入数据:" + message); } } @@ -627,13 +637,21 @@ String message = JsonConvert.SerializeObject(new Json("Event", devName, devCode, new EventJson(devName + "Event", eventList, logtime), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送事件:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } } - else + else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入事件:" + message); } } @@ -642,13 +660,21 @@ String message = JsonConvert.SerializeObject(new Json("StartupRequest", devName, devCode, new StartupJson(devName + "Data", startupList[0], startupList[1]), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送数据:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } } - else + else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入数据:" + message); } } } @@ -658,7 +684,9 @@ } } - + /** + * 包含信号强度 + * */ public static void sendMessage(CasicSession session, String devName, String devCode, int cell, int? pci, int? rsrp, int? snr, List eventList, List datasList, List startupList) { @@ -677,13 +705,20 @@ String message = JsonConvert.SerializeObject(new Json("Data", devName, devCode, new DataJson(devName + "Data", cell, pci, rsrp, snr, datasList, logtime), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); - } - else + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送数据:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } + } else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入数据:" + message); } } @@ -692,13 +727,20 @@ String message = JsonConvert.SerializeObject(new Json("Event", devName, devCode, new EventJson(devName + "Event", eventList, logtime), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); - } - else + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送事件:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } + } else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入事件:" + message); } } @@ -707,13 +749,21 @@ String message = JsonConvert.SerializeObject(new Json("StartupRequest", devName, devCode, new StartupJson(devName + "Data", startupList[0], startupList[1]), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送数据:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } } - else + else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入数据:" + message); } } } @@ -786,8 +836,24 @@ redis.KeyDelete(devCode); } } + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) + { + String message = JsonConvert.SerializeObject(new Json("SetResponse", devName, devCode, + new JsonBody(devName + "ConfigSuccess"), getTimeStamp())); - Common.kafkaSetResponseProduce(session, devCode, devName); + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送数据:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } + } + else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) + { + Common.kafkaSetResponseProduce(session, devCode, devName); + } } //public static void sendSetResponse(CasicSession session, String devCode, String devName) diff --git a/SensorHub.Methane/Methane.cs b/SensorHub.Methane/Methane.cs index d5f5f29..d5bf37b 100644 --- a/SensorHub.Methane/Methane.cs +++ b/SensorHub.Methane/Methane.cs @@ -58,9 +58,14 @@ return; } + Mutex mutex = new Mutex(); + mutex.WaitOne(); + //获取电量信息,系统时间,传递给对应的handler List tags = Common.getTags(settings, session); + mutex.ReleaseMutex(); + //具体业务处理 String collectDate = ""; int cell = -1; @@ -76,9 +81,6 @@ List datasList = new List(); List startupList = new List(); - Mutex mutex = new Mutex(); - mutex.WaitOne(); - foreach (Tag tag in tags) { if (!(tag is UploadTag)) @@ -197,15 +199,7 @@ } // 发送到flume或者kafka - if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) - { - Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); - } else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) - { - Common.kafkaProduce(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); - } - - mutex.ReleaseMutex(); + Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); if (softwareVersion != "" || size != 0 || offset != 0)//进入远程升级流程 { diff --git a/SensorHub.Servers/Commands/CASICCommands/SystemDateTag.cs b/SensorHub.Servers/Commands/CASICCommands/SystemDateTag.cs index dae96bc..e73098b 100644 --- a/SensorHub.Servers/Commands/CASICCommands/SystemDateTag.cs +++ b/SensorHub.Servers/Commands/CASICCommands/SystemDateTag.cs @@ -22,9 +22,9 @@ public SystemDateTag(String oid, int len, String dataValue) : base(oid, len, dataValue) { - int year = 2000+Int32.Parse(this.DataValue.Substring(0,2), System.Globalization.NumberStyles.HexNumber); - int month = Int32.Parse(this.DataValue.Substring(2, 2), System.Globalization.NumberStyles.HexNumber); - int day = Int32.Parse(this.DataValue.Substring(4, 2), System.Globalization.NumberStyles.HexNumber); + int year = 2000+Int32.Parse(dataValue.Substring(0,2), System.Globalization.NumberStyles.HexNumber); + int month = Int32.Parse(dataValue.Substring(2, 2), System.Globalization.NumberStyles.HexNumber); + int day = Int32.Parse(dataValue.Substring(4, 2), System.Globalization.NumberStyles.HexNumber); collectDate = year + "-" + month + "-" + day; } diff --git a/SensorHub.Servers/Common.cs b/SensorHub.Servers/Common.cs index 3572282..6fbe9e7 100644 --- a/SensorHub.Servers/Common.cs +++ b/SensorHub.Servers/Common.cs @@ -594,7 +594,9 @@ session.Logger.Info("往kafka写入数据:" + message); } - + /** + * 没有信号强度的字段 + * */ public static void sendMessage(CasicSession session, String devName, String devCode, int cell, List eventList, List datasList, List startupList) { DateTime now = DateTime.Now; @@ -612,13 +614,21 @@ String message = JsonConvert.SerializeObject(new Json("Data", devName, devCode, new DataJson(devName + "Data", cell, datasList, logtime), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送数据:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } } - else + else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入数据:" + message); } } @@ -627,13 +637,21 @@ String message = JsonConvert.SerializeObject(new Json("Event", devName, devCode, new EventJson(devName + "Event", eventList, logtime), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送事件:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } } - else + else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入事件:" + message); } } @@ -642,13 +660,21 @@ String message = JsonConvert.SerializeObject(new Json("StartupRequest", devName, devCode, new StartupJson(devName + "Data", startupList[0], startupList[1]), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送数据:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } } - else + else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入数据:" + message); } } } @@ -658,7 +684,9 @@ } } - + /** + * 包含信号强度 + * */ public static void sendMessage(CasicSession session, String devName, String devCode, int cell, int? pci, int? rsrp, int? snr, List eventList, List datasList, List startupList) { @@ -677,13 +705,20 @@ String message = JsonConvert.SerializeObject(new Json("Data", devName, devCode, new DataJson(devName + "Data", cell, pci, rsrp, snr, datasList, logtime), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); - } - else + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送数据:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } + } else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入数据:" + message); } } @@ -692,13 +727,20 @@ String message = JsonConvert.SerializeObject(new Json("Event", devName, devCode, new EventJson(devName + "Event", eventList, logtime), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); - } - else + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送事件:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } + } else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入事件:" + message); } } @@ -707,13 +749,21 @@ String message = JsonConvert.SerializeObject(new Json("StartupRequest", devName, devCode, new StartupJson(devName + "Data", startupList[0], startupList[1]), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送数据:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } } - else + else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入数据:" + message); } } } @@ -786,8 +836,24 @@ redis.KeyDelete(devCode); } } + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) + { + String message = JsonConvert.SerializeObject(new Json("SetResponse", devName, devCode, + new JsonBody(devName + "ConfigSuccess"), getTimeStamp())); - Common.kafkaSetResponseProduce(session, devCode, devName); + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送数据:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } + } + else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) + { + Common.kafkaSetResponseProduce(session, devCode, devName); + } } //public static void sendSetResponse(CasicSession session, String devCode, String devName) diff --git a/SensorHub.Tube/Tube.cs b/SensorHub.Tube/Tube.cs index e75dc6c..c0e3181 100644 --- a/SensorHub.Tube/Tube.cs +++ b/SensorHub.Tube/Tube.cs @@ -52,7 +52,7 @@ //判断是返回的设置确认数据帧, 回复第三方 if (operType == "SetResponse") { - Common.sendSetResponse(session, devCode, "Tube"); + Common.sendSetResponse(session, devCode, devName); return; } @@ -191,8 +191,7 @@ } } - // Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); - Common.kafkaProduce(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); + Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); if (softwareVersion != "" || size != 0 || offset != 0)//进入远程升级流程 { diff --git a/SensorHub.Methane/Methane.cs b/SensorHub.Methane/Methane.cs index d5f5f29..d5bf37b 100644 --- a/SensorHub.Methane/Methane.cs +++ b/SensorHub.Methane/Methane.cs @@ -58,9 +58,14 @@ return; } + Mutex mutex = new Mutex(); + mutex.WaitOne(); + //获取电量信息,系统时间,传递给对应的handler List tags = Common.getTags(settings, session); + mutex.ReleaseMutex(); + //具体业务处理 String collectDate = ""; int cell = -1; @@ -76,9 +81,6 @@ List datasList = new List(); List startupList = new List(); - Mutex mutex = new Mutex(); - mutex.WaitOne(); - foreach (Tag tag in tags) { if (!(tag is UploadTag)) @@ -197,15 +199,7 @@ } // 发送到flume或者kafka - if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) - { - Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); - } else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) - { - Common.kafkaProduce(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); - } - - mutex.ReleaseMutex(); + Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); if (softwareVersion != "" || size != 0 || offset != 0)//进入远程升级流程 { diff --git a/SensorHub.Servers/Commands/CASICCommands/SystemDateTag.cs b/SensorHub.Servers/Commands/CASICCommands/SystemDateTag.cs index dae96bc..e73098b 100644 --- a/SensorHub.Servers/Commands/CASICCommands/SystemDateTag.cs +++ b/SensorHub.Servers/Commands/CASICCommands/SystemDateTag.cs @@ -22,9 +22,9 @@ public SystemDateTag(String oid, int len, String dataValue) : base(oid, len, dataValue) { - int year = 2000+Int32.Parse(this.DataValue.Substring(0,2), System.Globalization.NumberStyles.HexNumber); - int month = Int32.Parse(this.DataValue.Substring(2, 2), System.Globalization.NumberStyles.HexNumber); - int day = Int32.Parse(this.DataValue.Substring(4, 2), System.Globalization.NumberStyles.HexNumber); + int year = 2000+Int32.Parse(dataValue.Substring(0,2), System.Globalization.NumberStyles.HexNumber); + int month = Int32.Parse(dataValue.Substring(2, 2), System.Globalization.NumberStyles.HexNumber); + int day = Int32.Parse(dataValue.Substring(4, 2), System.Globalization.NumberStyles.HexNumber); collectDate = year + "-" + month + "-" + day; } diff --git a/SensorHub.Servers/Common.cs b/SensorHub.Servers/Common.cs index 3572282..6fbe9e7 100644 --- a/SensorHub.Servers/Common.cs +++ b/SensorHub.Servers/Common.cs @@ -594,7 +594,9 @@ session.Logger.Info("往kafka写入数据:" + message); } - + /** + * 没有信号强度的字段 + * */ public static void sendMessage(CasicSession session, String devName, String devCode, int cell, List eventList, List datasList, List startupList) { DateTime now = DateTime.Now; @@ -612,13 +614,21 @@ String message = JsonConvert.SerializeObject(new Json("Data", devName, devCode, new DataJson(devName + "Data", cell, datasList, logtime), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送数据:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } } - else + else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入数据:" + message); } } @@ -627,13 +637,21 @@ String message = JsonConvert.SerializeObject(new Json("Event", devName, devCode, new EventJson(devName + "Event", eventList, logtime), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送事件:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } } - else + else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入事件:" + message); } } @@ -642,13 +660,21 @@ String message = JsonConvert.SerializeObject(new Json("StartupRequest", devName, devCode, new StartupJson(devName + "Data", startupList[0], startupList[1]), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送数据:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } } - else + else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入数据:" + message); } } } @@ -658,7 +684,9 @@ } } - + /** + * 包含信号强度 + * */ public static void sendMessage(CasicSession session, String devName, String devCode, int cell, int? pci, int? rsrp, int? snr, List eventList, List datasList, List startupList) { @@ -677,13 +705,20 @@ String message = JsonConvert.SerializeObject(new Json("Data", devName, devCode, new DataJson(devName + "Data", cell, pci, rsrp, snr, datasList, logtime), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); - } - else + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送数据:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } + } else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入数据:" + message); } } @@ -692,13 +727,20 @@ String message = JsonConvert.SerializeObject(new Json("Event", devName, devCode, new EventJson(devName + "Event", eventList, logtime), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); - } - else + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送事件:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } + } else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入事件:" + message); } } @@ -707,13 +749,21 @@ String message = JsonConvert.SerializeObject(new Json("StartupRequest", devName, devCode, new StartupJson(devName + "Data", startupList[0], startupList[1]), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送数据:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } } - else + else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入数据:" + message); } } } @@ -786,8 +836,24 @@ redis.KeyDelete(devCode); } } + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) + { + String message = JsonConvert.SerializeObject(new Json("SetResponse", devName, devCode, + new JsonBody(devName + "ConfigSuccess"), getTimeStamp())); - Common.kafkaSetResponseProduce(session, devCode, devName); + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送数据:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } + } + else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) + { + Common.kafkaSetResponseProduce(session, devCode, devName); + } } //public static void sendSetResponse(CasicSession session, String devCode, String devName) diff --git a/SensorHub.Tube/Tube.cs b/SensorHub.Tube/Tube.cs index e75dc6c..c0e3181 100644 --- a/SensorHub.Tube/Tube.cs +++ b/SensorHub.Tube/Tube.cs @@ -52,7 +52,7 @@ //判断是返回的设置确认数据帧, 回复第三方 if (operType == "SetResponse") { - Common.sendSetResponse(session, devCode, "Tube"); + Common.sendSetResponse(session, devCode, devName); return; } @@ -191,8 +191,7 @@ } } - // Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); - Common.kafkaProduce(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); + Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); if (softwareVersion != "" || size != 0 || offset != 0)//进入远程升级流程 { diff --git a/SensorHub.Well/Well.cs b/SensorHub.Well/Well.cs index 22a75ff..48546da 100644 --- a/SensorHub.Well/Well.cs +++ b/SensorHub.Well/Well.cs @@ -41,7 +41,14 @@ session.Logger.Info("设备类型:" + devType); session.Logger.Info("操作类型:" + operType); session.Logger.Info("会话:" + session.HubAddr + "," + session.SessionID); - + + //判断是返回的设置确认数据帧, 回复第三方 + if (operType == "SetResponse") + { + Common.sendSetResponse(session, devCode, devName); + return; + } + List tags = Common.getTags(settings, session); //具体业务处理 diff --git a/SensorHub.Methane/Methane.cs b/SensorHub.Methane/Methane.cs index d5f5f29..d5bf37b 100644 --- a/SensorHub.Methane/Methane.cs +++ b/SensorHub.Methane/Methane.cs @@ -58,9 +58,14 @@ return; } + Mutex mutex = new Mutex(); + mutex.WaitOne(); + //获取电量信息,系统时间,传递给对应的handler List tags = Common.getTags(settings, session); + mutex.ReleaseMutex(); + //具体业务处理 String collectDate = ""; int cell = -1; @@ -76,9 +81,6 @@ List datasList = new List(); List startupList = new List(); - Mutex mutex = new Mutex(); - mutex.WaitOne(); - foreach (Tag tag in tags) { if (!(tag is UploadTag)) @@ -197,15 +199,7 @@ } // 发送到flume或者kafka - if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) - { - Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); - } else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) - { - Common.kafkaProduce(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); - } - - mutex.ReleaseMutex(); + Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); if (softwareVersion != "" || size != 0 || offset != 0)//进入远程升级流程 { diff --git a/SensorHub.Servers/Commands/CASICCommands/SystemDateTag.cs b/SensorHub.Servers/Commands/CASICCommands/SystemDateTag.cs index dae96bc..e73098b 100644 --- a/SensorHub.Servers/Commands/CASICCommands/SystemDateTag.cs +++ b/SensorHub.Servers/Commands/CASICCommands/SystemDateTag.cs @@ -22,9 +22,9 @@ public SystemDateTag(String oid, int len, String dataValue) : base(oid, len, dataValue) { - int year = 2000+Int32.Parse(this.DataValue.Substring(0,2), System.Globalization.NumberStyles.HexNumber); - int month = Int32.Parse(this.DataValue.Substring(2, 2), System.Globalization.NumberStyles.HexNumber); - int day = Int32.Parse(this.DataValue.Substring(4, 2), System.Globalization.NumberStyles.HexNumber); + int year = 2000+Int32.Parse(dataValue.Substring(0,2), System.Globalization.NumberStyles.HexNumber); + int month = Int32.Parse(dataValue.Substring(2, 2), System.Globalization.NumberStyles.HexNumber); + int day = Int32.Parse(dataValue.Substring(4, 2), System.Globalization.NumberStyles.HexNumber); collectDate = year + "-" + month + "-" + day; } diff --git a/SensorHub.Servers/Common.cs b/SensorHub.Servers/Common.cs index 3572282..6fbe9e7 100644 --- a/SensorHub.Servers/Common.cs +++ b/SensorHub.Servers/Common.cs @@ -594,7 +594,9 @@ session.Logger.Info("往kafka写入数据:" + message); } - + /** + * 没有信号强度的字段 + * */ public static void sendMessage(CasicSession session, String devName, String devCode, int cell, List eventList, List datasList, List startupList) { DateTime now = DateTime.Now; @@ -612,13 +614,21 @@ String message = JsonConvert.SerializeObject(new Json("Data", devName, devCode, new DataJson(devName + "Data", cell, datasList, logtime), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送数据:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } } - else + else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入数据:" + message); } } @@ -627,13 +637,21 @@ String message = JsonConvert.SerializeObject(new Json("Event", devName, devCode, new EventJson(devName + "Event", eventList, logtime), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送事件:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } } - else + else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入事件:" + message); } } @@ -642,13 +660,21 @@ String message = JsonConvert.SerializeObject(new Json("StartupRequest", devName, devCode, new StartupJson(devName + "Data", startupList[0], startupList[1]), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送数据:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } } - else + else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入数据:" + message); } } } @@ -658,7 +684,9 @@ } } - + /** + * 包含信号强度 + * */ public static void sendMessage(CasicSession session, String devName, String devCode, int cell, int? pci, int? rsrp, int? snr, List eventList, List datasList, List startupList) { @@ -677,13 +705,20 @@ String message = JsonConvert.SerializeObject(new Json("Data", devName, devCode, new DataJson(devName + "Data", cell, pci, rsrp, snr, datasList, logtime), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); - } - else + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送数据:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } + } else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入数据:" + message); } } @@ -692,13 +727,20 @@ String message = JsonConvert.SerializeObject(new Json("Event", devName, devCode, new EventJson(devName + "Event", eventList, logtime), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); - } - else + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送事件:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } + } else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入事件:" + message); } } @@ -707,13 +749,21 @@ String message = JsonConvert.SerializeObject(new Json("StartupRequest", devName, devCode, new StartupJson(devName + "Data", startupList[0], startupList[1]), timeStamp)); - if (Common.SendMessage(message)) + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) { - session.Logger.Info("往第三方发送数据:" + message); + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送数据:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } } - else + else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) { - session.Logger.Info("未连接上第三方服务器"); + KafkaUtils.produce(KafkaUtils.TOPIC, message); + session.Logger.Info("往kafka写入数据:" + message); } } } @@ -786,8 +836,24 @@ redis.KeyDelete(devCode); } } + if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("flume")) + { + String message = JsonConvert.SerializeObject(new Json("SetResponse", devName, devCode, + new JsonBody(devName + "ConfigSuccess"), getTimeStamp())); - Common.kafkaSetResponseProduce(session, devCode, devName); + if (Common.SendMessage(message)) + { + session.Logger.Info("往第三方发送数据:" + message); + } + else + { + session.Logger.Info("未连接上第三方服务器"); + } + } + else if (ConfigurationManager.AppSettings["flumeOrKafka"].Equals("kafka")) + { + Common.kafkaSetResponseProduce(session, devCode, devName); + } } //public static void sendSetResponse(CasicSession session, String devCode, String devName) diff --git a/SensorHub.Tube/Tube.cs b/SensorHub.Tube/Tube.cs index e75dc6c..c0e3181 100644 --- a/SensorHub.Tube/Tube.cs +++ b/SensorHub.Tube/Tube.cs @@ -52,7 +52,7 @@ //判断是返回的设置确认数据帧, 回复第三方 if (operType == "SetResponse") { - Common.sendSetResponse(session, devCode, "Tube"); + Common.sendSetResponse(session, devCode, devName); return; } @@ -191,8 +191,7 @@ } } - // Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); - Common.kafkaProduce(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); + Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); if (softwareVersion != "" || size != 0 || offset != 0)//进入远程升级流程 { diff --git a/SensorHub.Well/Well.cs b/SensorHub.Well/Well.cs index 22a75ff..48546da 100644 --- a/SensorHub.Well/Well.cs +++ b/SensorHub.Well/Well.cs @@ -41,7 +41,14 @@ session.Logger.Info("设备类型:" + devType); session.Logger.Info("操作类型:" + operType); session.Logger.Info("会话:" + session.HubAddr + "," + session.SessionID); - + + //判断是返回的设置确认数据帧, 回复第三方 + if (operType == "SetResponse") + { + Common.sendSetResponse(session, devCode, devName); + return; + } + List tags = Common.getTags(settings, session); //具体业务处理 diff --git a/SensorHub.WellPlus/WellPlus.cs b/SensorHub.WellPlus/WellPlus.cs index aa858fc..9244735 100644 --- a/SensorHub.WellPlus/WellPlus.cs +++ b/SensorHub.WellPlus/WellPlus.cs @@ -130,24 +130,7 @@ } Common.sendMessage(session, devName, devCode, -1, eventList, datasList, startupList); - // Common.kafkaProduce(session, devName, devCode, -1, null, null, null, eventList, datasList, startupList); - /* - if (source != "433") //433井盖不要求回复 - { - byte[] btPdu = new byte[2]; //2个字节 - if (operType == "TrapRequest") - { - btPdu[0] = 0x05; - } - else if (operType == "StartupRequest") - { - btPdu[0] = 0x09; - } - - btPdu[1] = 0x86; - Common.sendConfig(session, devCode, routeFlag, source, btPdu); - }*/ byte[] btPdu = new byte[2]; //2个字节 btPdu[0] = Common.getRespOperType(operType, source == "433" ? true : false); btPdu[1] = 0x9D;