Newer
Older
ZXSSCJ / PhaseCompAcq / PhaseDevice.cpp
#include "PhaseDevice.h"
#include <QtMath>
#include <QDateTime>
#include <QThread>
#include <QTimer>
#include <QDebug>
#include <iostream>
#include "common/ConstCache.h"

PhaseDevice::PhaseDevice(QObject *parent) : QObject(parent)
{
    connect(&this->serialUtil, &QSerialPortUtil::dataRecieved,
            this, &PhaseDevice::dataReceivedHandler);

    if (SettingConfig::getInstance().NEED_KAFKA == 1) {
        kafkaUtil.setBrokers(SettingConfig::getInstance().KAFKA_BROKERS);
        kafkaUtil.setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC);
        kafkaUtil.createProducer();
    }

    QVector<QVector<double>> initPhase(PHASE_MESSURE_CHANNEL, QVector<double>(0, 0));
    phaseVector = initPhase;

    QVector<qulonglong> initTs(PHASE_MESSURE_CHANNEL);
    latestTsVector = initTs;

    QVector<QVector<double>> initAllen(PHASE_MESSURE_CHANNEL, QVector<double>(5, 0));
    channelAllen = initAllen;

    QVector<QStringList> initResult(PHASE_MESSURE_CHANNEL);
    channelAllenResultStr = initResult;

    connect(this, &PhaseDevice::calculateAllen, this, &PhaseDevice::onCalculateAllen);
}

PhaseDevice::~PhaseDevice()
{
    disconnect(&this->serialUtil, &QSerialPortUtil::dataRecieved,
                this, &PhaseDevice::dataReceivedHandler);
}

void PhaseDevice::setComName(QString comName)
{
    this->comName = comName;
}
void PhaseDevice::setBaudRate(int baudRate)
{
    this->baudRate = baudRate;
}
QString PhaseDevice::getDevCode()
{
    return this->devCode;
}
void PhaseDevice::setDevCode(QString devCode)
{
    this->devCode = devCode;
}
QString PhaseDevice::getDeviceId()
{
    return this->deviceId;
}
void PhaseDevice::setDeviceId(QString deviceId)
{
    this->deviceId = deviceId;
}

bool PhaseDevice::isSerialOpen()
{
    return this->serialUtil.isOpen();
}

void PhaseDevice::clearChannelPhaseData(int channelNo)
{
    this->phaseVector[channelNo - 1].clear();
}

void PhaseDevice::initSerialPort()
{   
    int master = SettingConfig::getInstance().MASTER;
    QStringList comNameList = this->comName.split(",");
    if (comNameList.isEmpty() == true)
    {
        return;
    }

    // 如果是主程序则打开主串口 备程序则打开备串口
    if (master == 1)
    {
        this->serialUtil.openSerialPort(comNameList.at(0), this->baudRate);
    } else
    {
        this->serialUtil.openSerialPort(comNameList.at(1), this->baudRate);
    }
}

void PhaseDevice::startWork()
{
    QString startCmd = PhaseProtocolBM::startMessure();
    for (int i = 0; i < 3; i++) {
        this->serialUtil.sendData(startCmd.toLocal8Bit());
        QThread::msleep(100);
    }

    // 开始计算后清除累积的相位数据和准确度计算结果
    for (int i = 0; i < phaseVector.size(); i++) {
        phaseVector[i].clear();
    }
}

void PhaseDevice::stopWork()
{
    QString stopCmd = PhaseProtocolBM::stopMessure();
    for (int i = 0; i < 3; i++) {
        this->serialUtil.sendData(stopCmd.toLocal8Bit());
        QThread::msleep(100);
    }
}

void PhaseDevice::dataReceivedHandler(QByteArray data)
{
    this->dataBuff.append(data);

//    std::cout << QByteUtil::binToHexString(this->dataBuff).toStdString() << std::endl;

    PhaseDataDto * phaseData = new PhaseDataDto(this);
    if (PhaseProtocolBM::checkFrame(this->dataBuff) == true)
    {
        phaseData->rawFrame = this->dataBuff;

        // ★解析成数据对象
        bool parse = PhaseProtocolBM::parseMessureData(this->dataBuff, phaseData);

        // 解析成功
        if (parse == true)
        {
            QDateTime now = QDateTime::currentDateTime();
            phaseData->devCode = this->devCode;
            phaseData->timestamp = now.toString("yyyy-MM-dd HH:mm:ss.zzz");
            phaseData->milisecond = now.toMSecsSinceEpoch();
            this->afterFramePhase(phaseData);
        }
    } else if (this->dataBuff.size() > PHASE_FRAM_LENGTH)
    {
        std::cout << QString("%1").arg(this->dataBuff.size()).toStdString() << std::endl;
        this->dataBuff.clear();
    }

    // 在此处释放内存,不影响后续显示
    // 不在此处释放内存则会导致内存持续增加
    // 具体原因不明
    delete phaseData;
}


void PhaseDevice::afterFramePhase(PhaseDataDto * phaseData)
{
    // 1. 清空dataBuff,等待下一帧的数据
    this->dataBuff.clear();

    // 2. 输出到日志文件中
    QString date = phaseData->timestamp.mid(0, 10);
    int hour = phaseData->timestamp.mid(11, 2).toInt();

    // 2.1 原始字节数组数据
    QString filename = "raw_" + devCode + ".log";
    QString content = phaseData->timestamp + " " + QByteUtil::binToHexString(phaseData->rawFrame);
    QLogUtil::writeRawDataLogByDate(date, filename, content);

    // 发送到消息队列里的内容
    QJsonArray messageArray;
    QString msgLogFilename("msg_%1-%2(%3-%4).log");
    msgLogFilename = msgLogFilename.arg(devCode).arg(QString::number(hour / 6 + 1));
    msgLogFilename = msgLogFilename.arg((hour / 6) * 6, 2, 10, QLatin1Char('0'));
    msgLogFilename = msgLogFilename.arg((hour / 6) * 6 + 5, 2, 10, QLatin1Char('0'));

    // 2.2 各个通道的相差数据
    for (int i = 1; i <= phaseData->channelActive.size(); i++)
    {
        // 判断最近的数据与当前时间的差
        qulonglong nowTs = QDateTime::currentMSecsSinceEpoch();
        qulonglong latestTs = latestTsVector.at(i - 1);
        qint32 tsDelta = nowTs - latestTs;

        // 如果超过通道离线的阈值 则清空数据栈 重新累计连续数据用于计算稳定度
        if (tsDelta > SettingConfig::getInstance().OFFLINE_THRE * 1000 && phaseVector[i - 1].size() > 0) {
            QMutex mutex;
            mutex.lock();

            QVector<double> initPhase(QVector<double>(0));
            phaseVector[i - 1] = initPhase;

            mutex.unlock();
        }

        if (phaseData->channelActive.at(i-1).toUInt() == 1)
        {
            // 存日志
            QString chFilename = QString("%1_CH_%2.log").arg(devCode).arg(i, 2, 10, QLatin1Char('0'));
            QString channelDataStr = QString("%1 %2").arg(phaseData->timestamp).arg(phaseData->channelDataStr.at(i-1));

            QLogUtil::writeChannelDataLogByDate(date, chFilename, channelDataStr);

            QJsonObject jsonObj = phaseData->toJSON(i - 1);
            jsonObj.insert("clientId", SettingConfig::getInstance().CLIENT_ID);
            jsonObj.insert("master", SettingConfig::getInstance().MASTER);
            jsonObj.insert("deviceId", deviceId);
            messageArray.append(jsonObj);

            // 将相位数据存入数据栈, 用于计算allen方差
            QMutex mutex;
            mutex.lock();
            phaseVector[i - 1].append(phaseData->channelData.at(i - 1));
            latestTsVector[i - 1] = phaseData->milisecond;
            if (phaseVector[i - 1].size() > SettingConfig::getInstance().MAX_DATA_SIZE) {
                phaseVector[i - 1] = phaseVector[i - 1].mid(phaseVector[i - 1].size() - SettingConfig::getInstance().MAX_DATA_SIZE);
            }
            mutex.unlock();
        }
    }

    // 3. 输出到中间件,执行后续处理过程
    QJsonObject statusObj = phaseData->toStatusJSON();
    statusObj.insert("clientId", SettingConfig::getInstance().CLIENT_ID);
    statusObj.insert("deviceId", deviceId);
    statusObj.insert("master", SettingConfig::getInstance().MASTER);

    if (SettingConfig::getInstance().NEED_KAFKA == 1)
    {
        if (SettingConfig::getInstance().MASTER == 0)
        {
            // 备路应用在主路不正常时才发送相位数据到消息队列
            if (QDateTime::currentDateTime().toSecsSinceEpoch() - ConstCache::getInstance().latestHeartTs > 5) {
                kafkaUtil.produceMessage(QString(QJsonDocument(messageArray).toJson(QJsonDocument::Compact)));
            }
        } else {
            kafkaUtil.produceMessage(QString(QJsonDocument(messageArray).toJson(QJsonDocument::Compact)));
        }

        // 每10秒发送一次设备状态信息
        if (QDateTime::currentDateTime().time().second() % 10 == 0) {
            kafkaUtil.produceMessage(SettingConfig::getInstance().KAFKA_STATUS_TOPIC, QString(QJsonDocument(statusObj).toJson(QJsonDocument::Compact)));
        }
    }

    QLogUtil::writeMessageLogByDate(date, msgLogFilename, QString(QJsonDocument(messageArray).toJson(QJsonDocument::Compact)));

    // 4. 在界面上简单显示相差数据结果
    emit this->sendDataToDraw(phaseData);

    // send signal to calculate allen and produce message to kafka
    emit calculateAllen(phaseData->milisecond);
}

void PhaseDevice::onCalculateAllen(qlonglong milisecond)
{
    QJsonArray performArray; // 消息内容数组

    // 不同维度的稳定度数据 1s - 10000s
    for (int i = 0; i < phaseVector.size(); i++)
    {
        QStringList result; // 用于界面显示的结果字符串
        int size = phaseVector[i].size(); // 累积的相差数据数组大小

        QJsonObject performData; // 每个通道的稳定度数据
        performData.insert("channelNo", i + 1);
        performData.insert("ts", (milisecond / 1000) * 1000);
        performData.insert("clientId", SettingConfig::getInstance().CLIENT_ID);
        performData.insert("master", SettingConfig::getInstance().MASTER);
        performData.insert("deviceId", deviceId);
        QJsonArray dataArr;

        for (int j = 0; j < 5; j++)
        {
            // 计算不同维度的allen方差值
            if (size >= 5 * qPow(10, j)) {
                channelAllen[i][j] = calAllan(i, qPow(10, j), size);
            } else {
                channelAllen[i][j] = 0.0;
            }
            // 给结果赋值  [allen方差值  采样数]
            result << QString::number(channelAllen[i][j], 'e', 4) << QString::number((int)(size / qPow(10, j)));

            // 发送给kafka消息队列的内容
            QJsonObject dataObj;
            dataObj.insert("tau", QString::number(qPow(10, j)));
            dataObj.insert("allen", QString::number(channelAllen[i][j], 'e', 4));
            dataObj.insert("count", QString::number((int)(size / qPow(10, j))));
            dataArr.append(dataObj);
        }
        performData.insert("data", dataArr);
        performArray.append(performData);

        channelAllenResultStr[i] = result;
    }

    // 发送到界面显示
    emit this->sendAllenToDraw(devCode, channelAllenResultStr);

    // 发送到kafka的消息队列中
    if (SettingConfig::getInstance().NEED_KAFKA == 1)
    {
        // 每5秒发送一次稳定度结果
        if (QDateTime::currentDateTime().time().second() % 5 == 0) {
            // 1s  10s  100s  1000s  10000s
            // 备路应用在主路不正常时才发送稳定度计算结果
            if (SettingConfig::getInstance().MASTER == 0) {
                if (QDateTime::currentDateTime().toSecsSinceEpoch() - ConstCache::getInstance().latestHeartTs > 5) {
                    kafkaUtil.produceMessage(SettingConfig::getInstance().KAFKA_PERFORM_TOPIC, QString(QJsonDocument(performArray).toJson(QJsonDocument::Compact)));
                }
            } else {
                kafkaUtil.produceMessage(SettingConfig::getInstance().KAFKA_PERFORM_TOPIC, QString(QJsonDocument(performArray).toJson(QJsonDocument::Compact)));
            }
        }
    }
}

double PhaseDevice::calAllan(int index, int d, int aN)
{
    double tau0 = 1;    //tau0是基本采样间隔,由计数器或比相仪的最小采样间隔决定,最小为1s
    double sum = 0.0;
    double allan[2] = {0.0};
    double *y = new double[aN - 2 * d];
    double tau_2 = qPow(d * tau0, 2);				//pow是计算x的y次幂

    for (int i = 0; i < aN - 2 * d; i++)
    {
        double vi2 = phaseVector[index][i+2*d];
        double vi1 = phaseVector[index][i+d];
        double vi = phaseVector[index][i];
        y[i] = qPow(vi2 - 2 * vi1 + vi, 2);
        sum += y[i];
    }

    allan[0] = sum/(2*tau_2*(aN-2*d));       //delta的平方
    allan[1] = qSqrt(sum/(2*tau_2*(aN-2*d))); //delta

    delete[] y;

    return allan[1];
}