#include "PhaseDevice.h" #include <QtMath> #include <QDateTime> #include <QThread> #include <QTimer> #include <QDebug> #include <iostream> #include "common/ConstCache.h" PhaseDevice::PhaseDevice(QObject *parent) : QObject(parent) { connect(&this->serialUtil, &QSerialPortUtil::dataRecieved, this, &PhaseDevice::dataReceivedHandler); if (SettingConfig::getInstance().NEED_KAFKA == 1) { kafkaUtil.setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); kafkaUtil.setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); kafkaUtil.createProducer(); } QVector<QVector<double>> initPhase(PHASE_MESSURE_CHANNEL, QVector<double>(0, 0)); phaseVector = initPhase; QVector<qulonglong> initTs(PHASE_MESSURE_CHANNEL); latestTsVector = initTs; QVector<QVector<double>> initAllen(PHASE_MESSURE_CHANNEL, QVector<double>(5, 0)); channelAllen = initAllen; QVector<QStringList> initResult(PHASE_MESSURE_CHANNEL); channelAllenResultStr = initResult; connect(this, &PhaseDevice::calculateAllen, this, &PhaseDevice::onCalculateAllen); } PhaseDevice::~PhaseDevice() { disconnect(&this->serialUtil, &QSerialPortUtil::dataRecieved, this, &PhaseDevice::dataReceivedHandler); } void PhaseDevice::setComName(QString comName) { this->comName = comName; } void PhaseDevice::setBaudRate(int baudRate) { this->baudRate = baudRate; } QString PhaseDevice::getDevCode() { return this->devCode; } void PhaseDevice::setDevCode(QString devCode) { this->devCode = devCode; } QString PhaseDevice::getDeviceId() { return this->deviceId; } void PhaseDevice::setDeviceId(QString deviceId) { this->deviceId = deviceId; } bool PhaseDevice::isSerialOpen() { return this->serialUtil.isOpen(); } void PhaseDevice::clearChannelPhaseData(int channelNo) { this->phaseVector[channelNo - 1].clear(); } void PhaseDevice::initSerialPort() { int master = SettingConfig::getInstance().MASTER; QStringList comNameList = this->comName.split(","); if (comNameList.isEmpty() == true) { return; } // 如果是主程序则打开主串口 备程序则打开备串口 if (master == 1) { this->serialUtil.openSerialPort(comNameList.at(0), this->baudRate); } else { this->serialUtil.openSerialPort(comNameList.at(1), this->baudRate); } } void PhaseDevice::startWork() { QString startCmd = PhaseProtocolBM::startMessure(); for (int i = 0; i < 3; i++) { this->serialUtil.sendData(startCmd.toLocal8Bit()); QThread::msleep(100); } // 开始计算后清除累积的相位数据和准确度计算结果 for (int i = 0; i < phaseVector.size(); i++) { phaseVector[i].clear(); } } void PhaseDevice::stopWork() { QString stopCmd = PhaseProtocolBM::stopMessure(); for (int i = 0; i < 3; i++) { this->serialUtil.sendData(stopCmd.toLocal8Bit()); QThread::msleep(100); } } void PhaseDevice::dataReceivedHandler(QByteArray data) { this->dataBuff.append(data); // std::cout << QByteUtil::binToHexString(this->dataBuff).toStdString() << std::endl; PhaseDataDto * phaseData = new PhaseDataDto(this); if (PhaseProtocolBM::checkFrame(this->dataBuff) == true) { phaseData->rawFrame = this->dataBuff; // ★解析成数据对象 bool parse = PhaseProtocolBM::parseMessureData(this->dataBuff, phaseData); // 解析成功 if (parse == true) { QDateTime now = QDateTime::currentDateTime(); phaseData->devCode = this->devCode; phaseData->timestamp = now.toString("yyyy-MM-dd HH:mm:ss.zzz"); phaseData->milisecond = now.toMSecsSinceEpoch(); this->afterFramePhase(phaseData); } } else if (this->dataBuff.size() > PHASE_FRAM_LENGTH) { std::cout << QString("%1").arg(this->dataBuff.size()).toStdString() << std::endl; this->dataBuff.clear(); } // 在此处释放内存,不影响后续显示 // 不在此处释放内存则会导致内存持续增加 // 具体原因不明 delete phaseData; } void PhaseDevice::afterFramePhase(PhaseDataDto * phaseData) { // 1. 清空dataBuff,等待下一帧的数据 this->dataBuff.clear(); // 2. 输出到日志文件中 QString date = phaseData->timestamp.mid(0, 10); int hour = phaseData->timestamp.mid(11, 2).toInt(); // 2.1 原始字节数组数据 QString filename = "raw_" + devCode + ".log"; QString content = phaseData->timestamp + " " + QByteUtil::binToHexString(phaseData->rawFrame); QLogUtil::writeRawDataLogByDate(date, filename, content); // 发送到消息队列里的内容 QJsonArray messageArray; QString msgLogFilename("msg_%1-%2(%3-%4).log"); msgLogFilename = msgLogFilename.arg(devCode).arg(QString::number(hour / 6 + 1)); msgLogFilename = msgLogFilename.arg((hour / 6) * 6, 2, 10, QLatin1Char('0')); msgLogFilename = msgLogFilename.arg((hour / 6) * 6 + 5, 2, 10, QLatin1Char('0')); // 2.2 各个通道的相差数据 for (int i = 1; i <= phaseData->channelActive.size(); i++) { // 判断最近的数据与当前时间的差 qulonglong nowTs = QDateTime::currentMSecsSinceEpoch(); qulonglong latestTs = latestTsVector.at(i - 1); qint32 tsDelta = nowTs - latestTs; // 如果超过通道离线的阈值 则清空数据栈 重新累计连续数据用于计算稳定度 if (tsDelta > SettingConfig::getInstance().OFFLINE_THRE * 1000 && phaseVector[i - 1].size() > 0) { QMutex mutex; mutex.lock(); QVector<double> initPhase(QVector<double>(0)); phaseVector[i - 1] = initPhase; mutex.unlock(); } if (phaseData->channelActive.at(i-1).toUInt() == 1) { // 存日志 QString chFilename = QString("%1_CH_%2.log").arg(devCode).arg(i, 2, 10, QLatin1Char('0')); QString channelDataStr = QString("%1 %2").arg(phaseData->timestamp).arg(phaseData->channelDataStr.at(i-1)); QLogUtil::writeChannelDataLogByDate(date, chFilename, channelDataStr); QJsonObject jsonObj = phaseData->toJSON(i - 1); jsonObj.insert("clientId", SettingConfig::getInstance().CLIENT_ID); jsonObj.insert("master", SettingConfig::getInstance().MASTER); jsonObj.insert("deviceId", deviceId); messageArray.append(jsonObj); // 将相位数据存入数据栈, 用于计算allen方差 QMutex mutex; mutex.lock(); phaseVector[i - 1].append(phaseData->channelData.at(i - 1)); latestTsVector[i - 1] = phaseData->milisecond; if (phaseVector[i - 1].size() > SettingConfig::getInstance().MAX_DATA_SIZE) { phaseVector[i - 1] = phaseVector[i - 1].mid(phaseVector[i - 1].size() - SettingConfig::getInstance().MAX_DATA_SIZE); } mutex.unlock(); } } // 3. 输出到中间件,执行后续处理过程 QJsonObject statusObj = phaseData->toStatusJSON(); statusObj.insert("clientId", SettingConfig::getInstance().CLIENT_ID); statusObj.insert("deviceId", deviceId); statusObj.insert("master", SettingConfig::getInstance().MASTER); if (SettingConfig::getInstance().NEED_KAFKA == 1) { if (SettingConfig::getInstance().MASTER == 0) { // 备路应用在主路不正常时才发送相位数据到消息队列 if (QDateTime::currentDateTime().toSecsSinceEpoch() - ConstCache::getInstance().latestHeartTs > 5) { kafkaUtil.produceMessage(QString(QJsonDocument(messageArray).toJson(QJsonDocument::Compact))); } } else { kafkaUtil.produceMessage(QString(QJsonDocument(messageArray).toJson(QJsonDocument::Compact))); } // 每10秒发送一次设备状态信息 if (QDateTime::currentDateTime().time().second() % 10 == 0) { kafkaUtil.produceMessage(SettingConfig::getInstance().KAFKA_STATUS_TOPIC, QString(QJsonDocument(statusObj).toJson(QJsonDocument::Compact))); } } QLogUtil::writeMessageLogByDate(date, msgLogFilename, QString(QJsonDocument(messageArray).toJson(QJsonDocument::Compact))); // 4. 在界面上简单显示相差数据结果 emit this->sendDataToDraw(phaseData); // send signal to calculate allen and produce message to kafka emit calculateAllen(phaseData->milisecond); } void PhaseDevice::onCalculateAllen(qlonglong milisecond) { QJsonArray performArray; // 消息内容数组 // 不同维度的稳定度数据 1s - 10000s for (int i = 0; i < phaseVector.size(); i++) { QStringList result; // 用于界面显示的结果字符串 int size = phaseVector[i].size(); // 累积的相差数据数组大小 QJsonObject performData; // 每个通道的稳定度数据 performData.insert("channelNo", i + 1); performData.insert("ts", (milisecond / 1000) * 1000); performData.insert("clientId", SettingConfig::getInstance().CLIENT_ID); performData.insert("master", SettingConfig::getInstance().MASTER); performData.insert("deviceId", deviceId); QJsonArray dataArr; for (int j = 0; j < 5; j++) { // 计算不同维度的allen方差值 if (size >= 5 * qPow(10, j)) { channelAllen[i][j] = calAllan(i, qPow(10, j), size); } else { channelAllen[i][j] = 0.0; } // 给结果赋值 [allen方差值 采样数] result << QString::number(channelAllen[i][j], 'e', 4) << QString::number((int)(size / qPow(10, j))); // 发送给kafka消息队列的内容 QJsonObject dataObj; dataObj.insert("tau", QString::number(qPow(10, j))); dataObj.insert("allen", QString::number(channelAllen[i][j], 'e', 4)); dataObj.insert("count", QString::number((int)(size / qPow(10, j)))); dataArr.append(dataObj); } performData.insert("data", dataArr); performArray.append(performData); channelAllenResultStr[i] = result; } // 发送到界面显示 emit this->sendAllenToDraw(devCode, channelAllenResultStr); // 发送到kafka的消息队列中 if (SettingConfig::getInstance().NEED_KAFKA == 1) { // 每5秒发送一次稳定度结果 if (QDateTime::currentDateTime().time().second() % 5 == 0) { // 1s 10s 100s 1000s 10000s // 备路应用在主路不正常时才发送稳定度计算结果 if (SettingConfig::getInstance().MASTER == 0) { if (QDateTime::currentDateTime().toSecsSinceEpoch() - ConstCache::getInstance().latestHeartTs > 5) { kafkaUtil.produceMessage(SettingConfig::getInstance().KAFKA_PERFORM_TOPIC, QString(QJsonDocument(performArray).toJson(QJsonDocument::Compact))); } } else { kafkaUtil.produceMessage(SettingConfig::getInstance().KAFKA_PERFORM_TOPIC, QString(QJsonDocument(performArray).toJson(QJsonDocument::Compact))); } } } } double PhaseDevice::calAllan(int index, int d, int aN) { double tau0 = 1; //tau0是基本采样间隔,由计数器或比相仪的最小采样间隔决定,最小为1s double sum = 0.0; double allan[2] = {0.0}; double *y = new double[aN - 2 * d]; double tau_2 = qPow(d * tau0, 2); //pow是计算x的y次幂 for (int i = 0; i < aN - 2 * d; i++) { double vi2 = phaseVector[index][i+2*d]; double vi1 = phaseVector[index][i+d]; double vi = phaseVector[index][i]; y[i] = qPow(vi2 - 2 * vi1 + vi, 2); sum += y[i]; } allan[0] = sum/(2*tau_2*(aN-2*d)); //delta的平方 allan[1] = qSqrt(sum/(2*tau_2*(aN-2*d))); //delta delete[] y; return allan[1]; }