diff --git a/SensorHub.Noise/Noise.cs b/SensorHub.Noise/Noise.cs index 4fc83b4..ed689fe 100644 --- a/SensorHub.Noise/Noise.cs +++ b/SensorHub.Noise/Noise.cs @@ -70,6 +70,7 @@ List eventList = new List(); List datasList = new List(); + List corrDataList = new List(); List startupList = new List(); foreach (Tag tag in tags) @@ -290,11 +291,20 @@ AccelerationDatas.acclerationValueMap.Add(devCode, realValue); - // 向第三方发送数据 + // 向第三方发送数据(批产应用) string uptime = DateTime.Now.ToString("yyyyMMddHHmmss"); - datasList.Add(new NoiseDatasJson(uptime, sumNoise * 2, 0)); // 采样率为7500的一半 总噪声值×2 + NoiseDatasJson noiData = new NoiseDatasJson(uptime, sumNoise * 2, 0); + datasList.Add(noiData); // 采样率为7500的一半 总噪声值×2 Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); + // 临时方案:向kafka发送数据(第三方集成) + Common.kafkaProduce(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); + + // 临时方案二,向相关仪应用的kafka队列发送包含原始采样点值的数据 + noiData.noiseSampleValues = realValue; + corrDataList.Add(noiData); + Common.CorrKafkaProduce(session, devName, devCode, cell, pci, rsrp, snr, corrDataList); + // 创建存盘线程 Thread savethread = new Thread(SavetoFlie) { diff --git a/SensorHub.Noise/Noise.cs b/SensorHub.Noise/Noise.cs index 4fc83b4..ed689fe 100644 --- a/SensorHub.Noise/Noise.cs +++ b/SensorHub.Noise/Noise.cs @@ -70,6 +70,7 @@ List eventList = new List(); List datasList = new List(); + List corrDataList = new List(); List startupList = new List(); foreach (Tag tag in tags) @@ -290,11 +291,20 @@ AccelerationDatas.acclerationValueMap.Add(devCode, realValue); - // 向第三方发送数据 + // 向第三方发送数据(批产应用) string uptime = DateTime.Now.ToString("yyyyMMddHHmmss"); - datasList.Add(new NoiseDatasJson(uptime, sumNoise * 2, 0)); // 采样率为7500的一半 总噪声值×2 + NoiseDatasJson noiData = new NoiseDatasJson(uptime, sumNoise * 2, 0); + datasList.Add(noiData); // 采样率为7500的一半 总噪声值×2 Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); + // 临时方案:向kafka发送数据(第三方集成) + Common.kafkaProduce(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); + + // 临时方案二,向相关仪应用的kafka队列发送包含原始采样点值的数据 + noiData.noiseSampleValues = realValue; + corrDataList.Add(noiData); + Common.CorrKafkaProduce(session, devName, devCode, cell, pci, rsrp, snr, corrDataList); + // 创建存盘线程 Thread savethread = new Thread(SavetoFlie) { diff --git a/SensorHub.Noise/NoiseDatasJson.cs b/SensorHub.Noise/NoiseDatasJson.cs index 9a6e69b..a5a2daa 100644 --- a/SensorHub.Noise/NoiseDatasJson.cs +++ b/SensorHub.Noise/NoiseDatasJson.cs @@ -12,6 +12,8 @@ public double noiseVal { get; set; } //噪声幅度值 public int noiseFreq { get; set; } //噪声频率值 + public double[] noiseSampleValues { get; set; } // 采样点原始数据 + public NoiseDatasJson(string uptime, double noiseVal, int noiseFreq) { this.uptime = uptime; diff --git a/SensorHub.Noise/Noise.cs b/SensorHub.Noise/Noise.cs index 4fc83b4..ed689fe 100644 --- a/SensorHub.Noise/Noise.cs +++ b/SensorHub.Noise/Noise.cs @@ -70,6 +70,7 @@ List eventList = new List(); List datasList = new List(); + List corrDataList = new List(); List startupList = new List(); foreach (Tag tag in tags) @@ -290,11 +291,20 @@ AccelerationDatas.acclerationValueMap.Add(devCode, realValue); - // 向第三方发送数据 + // 向第三方发送数据(批产应用) string uptime = DateTime.Now.ToString("yyyyMMddHHmmss"); - datasList.Add(new NoiseDatasJson(uptime, sumNoise * 2, 0)); // 采样率为7500的一半 总噪声值×2 + NoiseDatasJson noiData = new NoiseDatasJson(uptime, sumNoise * 2, 0); + datasList.Add(noiData); // 采样率为7500的一半 总噪声值×2 Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); + // 临时方案:向kafka发送数据(第三方集成) + Common.kafkaProduce(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); + + // 临时方案二,向相关仪应用的kafka队列发送包含原始采样点值的数据 + noiData.noiseSampleValues = realValue; + corrDataList.Add(noiData); + Common.CorrKafkaProduce(session, devName, devCode, cell, pci, rsrp, snr, corrDataList); + // 创建存盘线程 Thread savethread = new Thread(SavetoFlie) { diff --git a/SensorHub.Noise/NoiseDatasJson.cs b/SensorHub.Noise/NoiseDatasJson.cs index 9a6e69b..a5a2daa 100644 --- a/SensorHub.Noise/NoiseDatasJson.cs +++ b/SensorHub.Noise/NoiseDatasJson.cs @@ -12,6 +12,8 @@ public double noiseVal { get; set; } //噪声幅度值 public int noiseFreq { get; set; } //噪声频率值 + public double[] noiseSampleValues { get; set; } // 采样点原始数据 + public NoiseDatasJson(string uptime, double noiseVal, int noiseFreq) { this.uptime = uptime; diff --git a/SensorHub.Servers/Common.cs b/SensorHub.Servers/Common.cs index fb75705..843654a 100644 --- a/SensorHub.Servers/Common.cs +++ b/SensorHub.Servers/Common.cs @@ -594,6 +594,38 @@ } + public static void CorrKafkaProduce(CasicSession session, String devName, String devCode, int cell, + int? pci, int? rsrp, int? snr, List datasList) + { + DateTime now = DateTime.Now; + String logtime = now.ToString("yyyyMMddHHmmss"); + + DateTime startTime = TimeZone.CurrentTimeZone.ToLocalTime(new System.DateTime(1970, 1, 1)); //当地时区 + long timeStamp = (long)(now - startTime).TotalMilliseconds; //相差毫秒数 + + string brokers = ConfigurationManager.AppSettings["corrKafkaBrokers"]; + string topic = ConfigurationManager.AppSettings["corrKafkaBrokers"]; + + try + { + //往第三方发送数据 + if (datasList != null && datasList.Count > 0) + { + var jsonSetting = new JsonSerializerSettings { NullValueHandling = NullValueHandling.Ignore }; + string message = JsonConvert.SerializeObject(new Json("Data", devName, devCode, new DataJson(devName + "Data", cell, pci, rsrp, snr, datasList, logtime, null), timeStamp)); + + KafkaUtils.produce(brokers, topic, message); + + session.Logger.Info("往kafka写入数据:" + message); + } + } + catch (Exception ex) + { + session.Logger.Error("往kafka写入数据出错:" + ex.Message); + } + + } + public static void kafkaSetResponseProduce(CasicSession session, String devCode, String devName) { String message = JsonConvert.SerializeObject(new Json("SetResponse", devName, devCode, diff --git a/SensorHub.Noise/Noise.cs b/SensorHub.Noise/Noise.cs index 4fc83b4..ed689fe 100644 --- a/SensorHub.Noise/Noise.cs +++ b/SensorHub.Noise/Noise.cs @@ -70,6 +70,7 @@ List eventList = new List(); List datasList = new List(); + List corrDataList = new List(); List startupList = new List(); foreach (Tag tag in tags) @@ -290,11 +291,20 @@ AccelerationDatas.acclerationValueMap.Add(devCode, realValue); - // 向第三方发送数据 + // 向第三方发送数据(批产应用) string uptime = DateTime.Now.ToString("yyyyMMddHHmmss"); - datasList.Add(new NoiseDatasJson(uptime, sumNoise * 2, 0)); // 采样率为7500的一半 总噪声值×2 + NoiseDatasJson noiData = new NoiseDatasJson(uptime, sumNoise * 2, 0); + datasList.Add(noiData); // 采样率为7500的一半 总噪声值×2 Common.sendMessage(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); + // 临时方案:向kafka发送数据(第三方集成) + Common.kafkaProduce(session, devName, devCode, cell, pci, rsrp, snr, eventList, datasList, startupList); + + // 临时方案二,向相关仪应用的kafka队列发送包含原始采样点值的数据 + noiData.noiseSampleValues = realValue; + corrDataList.Add(noiData); + Common.CorrKafkaProduce(session, devName, devCode, cell, pci, rsrp, snr, corrDataList); + // 创建存盘线程 Thread savethread = new Thread(SavetoFlie) { diff --git a/SensorHub.Noise/NoiseDatasJson.cs b/SensorHub.Noise/NoiseDatasJson.cs index 9a6e69b..a5a2daa 100644 --- a/SensorHub.Noise/NoiseDatasJson.cs +++ b/SensorHub.Noise/NoiseDatasJson.cs @@ -12,6 +12,8 @@ public double noiseVal { get; set; } //噪声幅度值 public int noiseFreq { get; set; } //噪声频率值 + public double[] noiseSampleValues { get; set; } // 采样点原始数据 + public NoiseDatasJson(string uptime, double noiseVal, int noiseFreq) { this.uptime = uptime; diff --git a/SensorHub.Servers/Common.cs b/SensorHub.Servers/Common.cs index fb75705..843654a 100644 --- a/SensorHub.Servers/Common.cs +++ b/SensorHub.Servers/Common.cs @@ -594,6 +594,38 @@ } + public static void CorrKafkaProduce(CasicSession session, String devName, String devCode, int cell, + int? pci, int? rsrp, int? snr, List datasList) + { + DateTime now = DateTime.Now; + String logtime = now.ToString("yyyyMMddHHmmss"); + + DateTime startTime = TimeZone.CurrentTimeZone.ToLocalTime(new System.DateTime(1970, 1, 1)); //当地时区 + long timeStamp = (long)(now - startTime).TotalMilliseconds; //相差毫秒数 + + string brokers = ConfigurationManager.AppSettings["corrKafkaBrokers"]; + string topic = ConfigurationManager.AppSettings["corrKafkaBrokers"]; + + try + { + //往第三方发送数据 + if (datasList != null && datasList.Count > 0) + { + var jsonSetting = new JsonSerializerSettings { NullValueHandling = NullValueHandling.Ignore }; + string message = JsonConvert.SerializeObject(new Json("Data", devName, devCode, new DataJson(devName + "Data", cell, pci, rsrp, snr, datasList, logtime, null), timeStamp)); + + KafkaUtils.produce(brokers, topic, message); + + session.Logger.Info("往kafka写入数据:" + message); + } + } + catch (Exception ex) + { + session.Logger.Error("往kafka写入数据出错:" + ex.Message); + } + + } + public static void kafkaSetResponseProduce(CasicSession session, String devCode, String devName) { String message = JsonConvert.SerializeObject(new Json("SetResponse", devName, devCode, diff --git a/SensorHub.Servers/KafkaUtils.cs b/SensorHub.Servers/KafkaUtils.cs index 6d8a85f..af813cf 100644 --- a/SensorHub.Servers/KafkaUtils.cs +++ b/SensorHub.Servers/KafkaUtils.cs @@ -31,5 +31,22 @@ p.Flush(TimeSpan.FromSeconds(10)); } } + + public static void produce(string brokers, string topic, string message) + { + var conf = new ProducerConfig { BootstrapServers = brokers }; + + Action> handler = r => + Console.WriteLine(!r.Error.IsError ? "Delivered message to {r.TopicPartitionOffset}" : "Delivery Error: {r.Error.Reason}"); + + using (var p = new ProducerBuilder(conf).Build()) + { + + p.Produce(topic, new Message { Value = message }, handler); + + // wait for up to 10 seconds for any inflight messages to be delivered. + p.Flush(TimeSpan.FromSeconds(10)); + } + } } }