Newer
Older
ZXSSCJ / PhaseCompAcq / PhaseDevice.cpp
#include "PhaseDevice.h"
#include <iostream>
#include <QDateTime>
#include <QThread>
#include <QTimer>
#include <QtMath>
#include <QDebug>

PhaseDevice::PhaseDevice(QObject *parent) : QObject(parent)
{
    connect(&this->serialUtil, &QSerialPortUtil::dataRecieved,
            this, &PhaseDevice::dataReceivedHandler);

    if (SettingConfig::getInstance().NEED_KAFKA == 1) {
        kafkaUtil.setBrokers(SettingConfig::getInstance().KAFKA_BROKERS);
        kafkaUtil.setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC);
        kafkaUtil.createProducer();
    }

    QVector<QVector<double>> initPhase(PHASE_MESSURE_CHANNEL, QVector<double>(0, 0));
    phaseVector = initPhase;

    QVector<QVector<double>> initAllen(PHASE_MESSURE_CHANNEL, QVector<double>(5, 0));
    channelAllen = initAllen;

    QVector<QVector<double>> initSigma(PHASE_MESSURE_CHANNEL, QVector<double>(5, 0));
    channelAllenSigma = initSigma;

    QVector<QStringList> initResult(PHASE_MESSURE_CHANNEL);
    channelAllenResultStr = initResult;

    connect(this, &PhaseDevice::calculateAllen, this, &PhaseDevice::onCalculateAllen);
}

PhaseDevice::~PhaseDevice()
{
    disconnect(&this->serialUtil, &QSerialPortUtil::dataRecieved,
                this, &PhaseDevice::dataReceivedHandler);
}

void PhaseDevice::setComName(QString comName)
{
    this->comName = comName;
}
void PhaseDevice::setBaudRate(int baudRate)
{
    this->baudRate = baudRate;
}
QString PhaseDevice::getDevCode()
{
    return this->devCode;
}
void PhaseDevice::setDevCode(QString devCode)
{
    this->devCode = devCode;
}
void PhaseDevice::setDeviceId(QString deviceId)
{
    this->deviceId = deviceId;
}

bool PhaseDevice::isSerialOpen()
{
    return this->serialUtil.isOpen();
}

void PhaseDevice::initSerialPort()
{   
    int master = SettingConfig::getInstance().MASTER;
    QStringList comNameList = this->comName.split(",");
    if (comNameList.isEmpty() == true)
    {
        return;
    }

    // 如果是主程序则打开主串口 备程序则打开备串口
    if (master == 1)
    {
        this->serialUtil.openSerialPort(comNameList.at(0), this->baudRate);
    } else
    {
        this->serialUtil.openSerialPort(comNameList.at(1), this->baudRate);
    }
}

void PhaseDevice::startWork()
{
    QString startCmd = PhaseProtocolBM::startMessure();
    for (int i = 0; i < 3; i++) {
        this->serialUtil.sendData(startCmd.toLocal8Bit());
        QThread::msleep(100);
    }
}

void PhaseDevice::stopWork()
{
    QString stopCmd = PhaseProtocolBM::stopMessure();
    for (int i = 0; i < 3; i++) {
        this->serialUtil.sendData(stopCmd.toLocal8Bit());
        QThread::msleep(100);
    }
}

void PhaseDevice::dataReceivedHandler(QByteArray data)
{
    this->dataBuff.append(data);

//    std::cout << QByteUtil::binToHexString(this->dataBuff).toStdString() << std::endl;

    PhaseDataDto * phaseData = new PhaseDataDto(this);
    if (PhaseProtocolBM::checkFrame(this->dataBuff) == true)
    {
        phaseData->rawFrame = this->dataBuff;

        // ★解析成数据对象
        bool parse = PhaseProtocolBM::parseMessureData(this->dataBuff, phaseData);

        // 解析成功
        if (parse == true)
        {
            QDateTime now = QDateTime::currentDateTime();
            phaseData->devCode = this->devCode;
            phaseData->timestamp = now.toString("yyyy-MM-dd HH:mm:ss.zzz");
            phaseData->milisecond = now.toMSecsSinceEpoch();
            this->afterFramePhase(phaseData);
        }
    } else if (this->dataBuff.size() > PHASE_FRAM_LENGTH)
    {
        std::cout << QString("%1").arg(this->dataBuff.size()).toStdString() << std::endl;
        this->dataBuff.clear();
    }

    // 在此处释放内存,不影响后续显示
    // 不在此处释放内存则会导致内存持续增加
    // 具体原因不明
    delete phaseData;
}


void PhaseDevice::afterFramePhase(PhaseDataDto * phaseData)
{
    // 1. 清空dataBuff,等待下一帧的数据
    this->dataBuff.clear();

    // 2. 输出到日志文件中
    QString date = phaseData->timestamp.mid(0, 10);

    // 2.1 原始字节数组数据
    QString filename = "raw_" + devCode + ".log";
    QString content = phaseData->timestamp + " " + QByteUtil::binToHexString(phaseData->rawFrame);
    QLogUtil::writeRawDataLogByDate(date, filename, content);

    // 发送到消息队列里的内容
    QJsonArray messageArray;
    QJsonArray performArray;
    QString msgLogFilename = "msg_" + devCode + ".log";
    QString perfLogFilename = "perf_" + devCode + ".log";

    // 2.2 各个通道的相差数据
    for (int i = 1; i <= phaseData->channelActive.size(); i++)
    {
        if (phaseData->channelActive.at(i-1).toUInt() == 1)
        {
            // 存日志
            QString chFilename = QString("%1_CH_%2.log").arg(devCode).arg(i, 2, 10, QLatin1Char('0'));
            QString channelDataStr = QString("%1 [%2] %3").arg(phaseData->timestamp).arg(phaseData->frameId).arg(phaseData->channelDataStr.at(i-1));

            QLogUtil::writeChannelDataLogByDate(date, chFilename, channelDataStr);

            QJsonObject jsonObj = phaseData->toJSON(i - 1);
            jsonObj.insert("clientId", SettingConfig::getInstance().CLIENT_ID);
            jsonObj.insert("master", SettingConfig::getInstance().MASTER);
            jsonObj.insert("deviceId", deviceId);
            messageArray.append(jsonObj);

            // 将相位数据存入数据栈, 用于计算allen方差
            QMutex mutex;
            mutex.lock();
            phaseVector[i - 1].append(phaseData->channelData.at(i - 1));
            if (phaseVector[i - 1].size() > SettingConfig::getInstance().MAX_DATA_SIZE) {
                phaseVector[i - 1] = phaseVector[i - 1].mid(phaseVector[i - 1].size() - SettingConfig::getInstance().MAX_DATA_SIZE);
            }
            mutex.unlock();
        }
    }

    // 3. 输出到中间件,执行后续处理过程
    QJsonObject statusObj = phaseData->toStatusJSON();
    statusObj.insert("clientId", SettingConfig::getInstance().CLIENT_ID);
    statusObj.insert("deviceId", deviceId);
    statusObj.insert("master", SettingConfig::getInstance().MASTER);

    if (SettingConfig::getInstance().NEED_KAFKA == 1)
    {
        kafkaUtil.produceMessage(QString(QJsonDocument(messageArray).toJson(QJsonDocument::Compact)));

        // 每10秒发送一次设备状态信息
        if (QDateTime::currentDateTime().time().second() % 10 == 0) {
            kafkaUtil.produceMessage(SettingConfig::getInstance().KAFKA_STATUS_TOPIC, QString(QJsonDocument(statusObj).toJson(QJsonDocument::Compact)));
        }

        // 模拟发送稳定度计算结果
        // 1s  10s  100s  1000s  10000s
        kafkaUtil.produceMessage(SettingConfig::getInstance().KAFKA_PERFORM_TOPIC, QString(QJsonDocument(performArray).toJson(QJsonDocument::Compact)));
    }

    QLogUtil::writeChannelDataLogByDate(date, msgLogFilename, QString(QJsonDocument(messageArray).toJson(QJsonDocument::Compact)));
    QLogUtil::writeChannelDataLogByDate(date, perfLogFilename, QString(QJsonDocument(performArray).toJson(QJsonDocument::Compact)));

    // 4. 在界面上简单显示相差数据结果
    emit this->sendDataToDraw(phaseData);

    // send signal to calculate allen and produce message to kafka
    emit calculateAllen();
}

void PhaseDevice::onCalculateAllen()
{
    // 不同维度的稳定度数据 1s - 10000s
    for (int i = 0; i < phaseVector.size(); i++)
    {
        QStringList result;
        int size = phaseVector[i].size();
        for (int j = 0; j < 5; j++)
        {
            if (size >= 5 * pow(10, j)) {
                channelAllen[i][j] = calAllan(i, pow(10, j), size);
            } else {
                channelAllen[i][j] = 0.0;
            }

            result << QString::number(channelAllen[i][j], 'e', 4) << QString::number((int)(size / pow(10, j)));
        }

        channelAllenResultStr[i] = result;
//        qDebug() << result;
    }

    emit this->sendAllenToDraw(devCode, channelAllenResultStr);
/*
    // 分别计算不同时间维度的稳定度
    int size = phaseVector[i - 1].size();
    if(size >= 5)  allen[0] = calAllan(i - 1, 1, size); else allen[0] = 0;
    if(size >= 50) allen[1] = calAllan(i - 1, 10, size); else allen[1] = 0;
    if(size >= 500) allen[2] = calAllan(i - 1, 100, size); else allen[2] = 0;
    if(size >= 5000) allen[3] = calAllan(i - 1, 1000, size); else allen[3] = 0;
    if(size >= 50000) allen[4] = calAllan(i - 1, 10000, size); else allen[4] = 0;

    // mock perform data
    QJsonObject performData;
    performData.insert("channelNo", i);
    performData.insert("ts", (phaseData->milisecond / 1000) * 1000);
    performData.insert("clientId", SettingConfig::getInstance().CLIENT_ID);
    performData.insert("master", SettingConfig::getInstance().MASTER);
    performData.insert("deviceId", deviceId);
    QJsonArray dataArr;
    for (int k = 0; k < 5; k++) {
        QJsonObject dataObj;
        dataObj.insert("tau", QString::number(qPow(10, k)));
        dataObj.insert("allen", QString::number(allen[k], 'e', 4));
        dataObj.insert("count", QString::number((int)(size / qPow(10, k))));
        dataArr.append(dataObj);

        QStringList allenStrList;
        allenStrList << QString::number(qPow(10, k)) << QString::number(allen[k], 'e', 4) << QString::number((int)(size / qPow(10, k)));
        phaseData->allenDataStr.append(allenStrList);
    }
    performData.insert("data", dataArr);
    performArray.append(performData);
    */
}

double PhaseDevice::calAllan(int index, int d, int aN)
{
    int i = 0;
    double tau0 = 1;    //tau0是基本采样间隔,由计数器或比相仪的最小采样间隔决定,最小为1s
    double allan[2] = {0.0};
    double *y = new double[3];
    double tau_2 = pow(d * tau0, 2);				//pow是计算x的y次幂

    int logd = log10(d);

    double sum = channelAllenSigma[index][logd];

    for (int j = 3; j > 0; j--)
    {
        i = aN - 2*d - j;
        double v2 = phaseVector[index][i+2*d];
        double v1 = phaseVector[index][i+d];
        double v0 = phaseVector[index][i];
        y[3 - j] = pow(v2 - 2 * v1 + v0, 2);

        sum += y[3 - j];
    }

    channelAllenSigma[index][logd] += y[0];

    allan[0] = sum/(2*tau_2*(aN-2*d));       //delta的平方
    allan[1] = sqrt(sum/(2*tau_2*(aN-2*d))); //delta

    delete[] y;

    return allan[1];
}