diff --git a/CounterAcq/CounterDevice.cpp b/CounterAcq/CounterDevice.cpp index 3dc812b..113f83b 100644 --- a/CounterAcq/CounterDevice.cpp +++ b/CounterAcq/CounterDevice.cpp @@ -7,6 +7,10 @@ { connect(&this->serialUtil, &QSerialPortUtil::dataRecieved, this, &CounterDevice::dataReceivedHandler); + + kafkaUtil.setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil.setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil.createProducer(); } CounterDevice::~CounterDevice() @@ -74,15 +78,6 @@ void CounterDevice::afterFramePhase(CounterDataDto * counterData) { - std::cout << counterData->frameId.toStdString() << ", " << counterData->timestamp.toStdString() << ", " << counterData->milisecond << std::endl; -// for (int i = 1; i <= PHASE_MESSURE_CHANNEL; i++) -// { -// if (phaseData->channelActive.at(i-1).toInt() != 0) -// { -// std::cout << "ch: " << i << ", " << phaseData->channelData.at(i-1) << std::endl; -// } -// } - // 1. 清空dataBuff,等待下一帧的数据 this->dataBuff.clear(); @@ -95,28 +90,29 @@ QString content = counterData->timestamp + " " + counterData->rawFrame.left(counterData->rawFrame.size() - COUNTER_FRAME_TAIL.size()); QLogUtil::writeRawDataLog(filename, content); - // 2.2 各个通道的相差数据 -// for (int i = 1; i <= phaseData->channelActive.size(); i++) -// { -// if (phaseData->channelActive.at(i-1).toUInt() == 1) -// { -// QString chFilename("%1CH_%2_%3.log"); -// chFilename = chFilename.arg(filePrefix); -// if (i < 10) -// { -// chFilename = chFilename.arg(QString("0%1").arg(i)); -// } else -// { -// chFilename = chFilename.arg(i); -// } -// chFilename = chFilename.arg(filePostfix); -// QString channelDataStr = QString("%1 %2 %3").arg(phaseData->timestamp).arg(phaseData->channelData.at(i-1)).arg(phaseData->frameId); + // 2.2 各个通道的clock diff数据 + QString chFilename("%1CH_%2_%3.log"); + chFilename = chFilename.arg(filePrefix); + if (counterData->channelId < 10) + { + chFilename = chFilename.arg(QString("0%1").arg(counterData->channelId)); + } else + { + chFilename = chFilename.arg(counterData->channelId); + } + chFilename = chFilename.arg(filePostfix); + QString channelDataStr = QString("%1 %2 %3").arg(counterData->timestamp).arg(counterData->channelClockValue).arg(counterData->frameId); -// QLogUtil::writeChannelDataLog(chFilename, channelDataStr); -// } -// } + QLogUtil::writeChannelDataLog(chFilename, channelDataStr); // 3. 输出到中间件,执行后续处理过程 + if (SettingConfig::getInstance().NEED_KAFKA == 1) + { + QJsonObject jsonObj = counterData->toJSON(); + jsonObj.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + jsonObj.insert("deviceId", devCode); + kafkaUtil.produceMessage(QString(QJsonDocument(jsonObj).toJson(QJsonDocument::Compact))); + } // 4. 在界面上简单显示相差数据结果 // emit this->sendDataToDraw(counterData); diff --git a/CounterAcq/CounterDevice.cpp b/CounterAcq/CounterDevice.cpp index 3dc812b..113f83b 100644 --- a/CounterAcq/CounterDevice.cpp +++ b/CounterAcq/CounterDevice.cpp @@ -7,6 +7,10 @@ { connect(&this->serialUtil, &QSerialPortUtil::dataRecieved, this, &CounterDevice::dataReceivedHandler); + + kafkaUtil.setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil.setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil.createProducer(); } CounterDevice::~CounterDevice() @@ -74,15 +78,6 @@ void CounterDevice::afterFramePhase(CounterDataDto * counterData) { - std::cout << counterData->frameId.toStdString() << ", " << counterData->timestamp.toStdString() << ", " << counterData->milisecond << std::endl; -// for (int i = 1; i <= PHASE_MESSURE_CHANNEL; i++) -// { -// if (phaseData->channelActive.at(i-1).toInt() != 0) -// { -// std::cout << "ch: " << i << ", " << phaseData->channelData.at(i-1) << std::endl; -// } -// } - // 1. 清空dataBuff,等待下一帧的数据 this->dataBuff.clear(); @@ -95,28 +90,29 @@ QString content = counterData->timestamp + " " + counterData->rawFrame.left(counterData->rawFrame.size() - COUNTER_FRAME_TAIL.size()); QLogUtil::writeRawDataLog(filename, content); - // 2.2 各个通道的相差数据 -// for (int i = 1; i <= phaseData->channelActive.size(); i++) -// { -// if (phaseData->channelActive.at(i-1).toUInt() == 1) -// { -// QString chFilename("%1CH_%2_%3.log"); -// chFilename = chFilename.arg(filePrefix); -// if (i < 10) -// { -// chFilename = chFilename.arg(QString("0%1").arg(i)); -// } else -// { -// chFilename = chFilename.arg(i); -// } -// chFilename = chFilename.arg(filePostfix); -// QString channelDataStr = QString("%1 %2 %3").arg(phaseData->timestamp).arg(phaseData->channelData.at(i-1)).arg(phaseData->frameId); + // 2.2 各个通道的clock diff数据 + QString chFilename("%1CH_%2_%3.log"); + chFilename = chFilename.arg(filePrefix); + if (counterData->channelId < 10) + { + chFilename = chFilename.arg(QString("0%1").arg(counterData->channelId)); + } else + { + chFilename = chFilename.arg(counterData->channelId); + } + chFilename = chFilename.arg(filePostfix); + QString channelDataStr = QString("%1 %2 %3").arg(counterData->timestamp).arg(counterData->channelClockValue).arg(counterData->frameId); -// QLogUtil::writeChannelDataLog(chFilename, channelDataStr); -// } -// } + QLogUtil::writeChannelDataLog(chFilename, channelDataStr); // 3. 输出到中间件,执行后续处理过程 + if (SettingConfig::getInstance().NEED_KAFKA == 1) + { + QJsonObject jsonObj = counterData->toJSON(); + jsonObj.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + jsonObj.insert("deviceId", devCode); + kafkaUtil.produceMessage(QString(QJsonDocument(jsonObj).toJson(QJsonDocument::Compact))); + } // 4. 在界面上简单显示相差数据结果 // emit this->sendDataToDraw(counterData); diff --git a/CounterAcq/CounterDevice.h b/CounterAcq/CounterDevice.h index 3ca84da..2e5f04e 100644 --- a/CounterAcq/CounterDevice.h +++ b/CounterAcq/CounterDevice.h @@ -4,8 +4,10 @@ #include #include "common/utils/QSerialPortUtil.h" +#include "common/utils/QKafkaUtil.h" #include "common/utils/QByteUtil.h" #include "common/utils/QLogUtil.h" +#include "common/utils/SettingConfig.h" #include "protocol/CounterProtocolBM.h" class CounterDevice : public QObject @@ -32,6 +34,7 @@ int baudRate; QSerialPortUtil serialUtil; + QKafkaUtil kafkaUtil; QByteArray dataBuff; signals: diff --git a/CounterAcq/CounterDevice.cpp b/CounterAcq/CounterDevice.cpp index 3dc812b..113f83b 100644 --- a/CounterAcq/CounterDevice.cpp +++ b/CounterAcq/CounterDevice.cpp @@ -7,6 +7,10 @@ { connect(&this->serialUtil, &QSerialPortUtil::dataRecieved, this, &CounterDevice::dataReceivedHandler); + + kafkaUtil.setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil.setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil.createProducer(); } CounterDevice::~CounterDevice() @@ -74,15 +78,6 @@ void CounterDevice::afterFramePhase(CounterDataDto * counterData) { - std::cout << counterData->frameId.toStdString() << ", " << counterData->timestamp.toStdString() << ", " << counterData->milisecond << std::endl; -// for (int i = 1; i <= PHASE_MESSURE_CHANNEL; i++) -// { -// if (phaseData->channelActive.at(i-1).toInt() != 0) -// { -// std::cout << "ch: " << i << ", " << phaseData->channelData.at(i-1) << std::endl; -// } -// } - // 1. 清空dataBuff,等待下一帧的数据 this->dataBuff.clear(); @@ -95,28 +90,29 @@ QString content = counterData->timestamp + " " + counterData->rawFrame.left(counterData->rawFrame.size() - COUNTER_FRAME_TAIL.size()); QLogUtil::writeRawDataLog(filename, content); - // 2.2 各个通道的相差数据 -// for (int i = 1; i <= phaseData->channelActive.size(); i++) -// { -// if (phaseData->channelActive.at(i-1).toUInt() == 1) -// { -// QString chFilename("%1CH_%2_%3.log"); -// chFilename = chFilename.arg(filePrefix); -// if (i < 10) -// { -// chFilename = chFilename.arg(QString("0%1").arg(i)); -// } else -// { -// chFilename = chFilename.arg(i); -// } -// chFilename = chFilename.arg(filePostfix); -// QString channelDataStr = QString("%1 %2 %3").arg(phaseData->timestamp).arg(phaseData->channelData.at(i-1)).arg(phaseData->frameId); + // 2.2 各个通道的clock diff数据 + QString chFilename("%1CH_%2_%3.log"); + chFilename = chFilename.arg(filePrefix); + if (counterData->channelId < 10) + { + chFilename = chFilename.arg(QString("0%1").arg(counterData->channelId)); + } else + { + chFilename = chFilename.arg(counterData->channelId); + } + chFilename = chFilename.arg(filePostfix); + QString channelDataStr = QString("%1 %2 %3").arg(counterData->timestamp).arg(counterData->channelClockValue).arg(counterData->frameId); -// QLogUtil::writeChannelDataLog(chFilename, channelDataStr); -// } -// } + QLogUtil::writeChannelDataLog(chFilename, channelDataStr); // 3. 输出到中间件,执行后续处理过程 + if (SettingConfig::getInstance().NEED_KAFKA == 1) + { + QJsonObject jsonObj = counterData->toJSON(); + jsonObj.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + jsonObj.insert("deviceId", devCode); + kafkaUtil.produceMessage(QString(QJsonDocument(jsonObj).toJson(QJsonDocument::Compact))); + } // 4. 在界面上简单显示相差数据结果 // emit this->sendDataToDraw(counterData); diff --git a/CounterAcq/CounterDevice.h b/CounterAcq/CounterDevice.h index 3ca84da..2e5f04e 100644 --- a/CounterAcq/CounterDevice.h +++ b/CounterAcq/CounterDevice.h @@ -4,8 +4,10 @@ #include #include "common/utils/QSerialPortUtil.h" +#include "common/utils/QKafkaUtil.h" #include "common/utils/QByteUtil.h" #include "common/utils/QLogUtil.h" +#include "common/utils/SettingConfig.h" #include "protocol/CounterProtocolBM.h" class CounterDevice : public QObject @@ -32,6 +34,7 @@ int baudRate; QSerialPortUtil serialUtil; + QKafkaUtil kafkaUtil; QByteArray dataBuff; signals: diff --git a/CounterAcq/CounterWindow.cpp b/CounterAcq/CounterWindow.cpp index 4bad032..6059e0b 100644 --- a/CounterAcq/CounterWindow.cpp +++ b/CounterAcq/CounterWindow.cpp @@ -1,6 +1,10 @@ #include "CounterWindow.h" #include "ui_CounterWindow.h" +#include +#include +#include + CounterWindow::CounterWindow(QWidget *parent) : QWidget(parent), ui(new Ui::CounterWindow) @@ -32,6 +36,9 @@ device->initSerialPort(); // device->startWork(); } + + timer = new QTimer(this); + timer->start(1000 * 10); } CounterWindow::~CounterWindow() @@ -41,5 +48,48 @@ void CounterWindow::on_pushButton_clicked() { + connect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); +} + +void CounterWindow::mockOneFrame() +{ qlonglong deviceId = 9101; + + QKafkaUtil * kafkaUtil = new QKafkaUtil(this); + kafkaUtil->setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil->setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil->createProducer(); + + QDateTime now = QDateTime::currentDateTime(); + + QString frameId = QString("%1").arg(now.toSecsSinceEpoch() % 10000); + qlonglong ts = now.toMSecsSinceEpoch(); + + for (int i = 0; i < 16; i++) { + CounterDataDto channelDataDto; + + channelDataDto.devCode = deviceId; + + channelDataDto.frameId = frameId; + channelDataDto.channelData = qrand() % 400; + channelDataDto.channelRefId = 0; + channelDataDto.channelId = i; + channelDataDto.milisecond = ts; + channelDataDto.timestamp = now.toString("yyyyMMddHHmmssZZZ"); + + QJsonObject dtoJson = channelDataDto.toJSON(); + dtoJson.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + dtoJson.insert("deviceId", deviceId); + + std::cout << QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact)).toStdString() << std::endl; + + kafkaUtil->produceMessage(QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact))); + } +} + +void CounterWindow::on_pushButton_2_clicked() +{ + disconnect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); } diff --git a/CounterAcq/CounterDevice.cpp b/CounterAcq/CounterDevice.cpp index 3dc812b..113f83b 100644 --- a/CounterAcq/CounterDevice.cpp +++ b/CounterAcq/CounterDevice.cpp @@ -7,6 +7,10 @@ { connect(&this->serialUtil, &QSerialPortUtil::dataRecieved, this, &CounterDevice::dataReceivedHandler); + + kafkaUtil.setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil.setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil.createProducer(); } CounterDevice::~CounterDevice() @@ -74,15 +78,6 @@ void CounterDevice::afterFramePhase(CounterDataDto * counterData) { - std::cout << counterData->frameId.toStdString() << ", " << counterData->timestamp.toStdString() << ", " << counterData->milisecond << std::endl; -// for (int i = 1; i <= PHASE_MESSURE_CHANNEL; i++) -// { -// if (phaseData->channelActive.at(i-1).toInt() != 0) -// { -// std::cout << "ch: " << i << ", " << phaseData->channelData.at(i-1) << std::endl; -// } -// } - // 1. 清空dataBuff,等待下一帧的数据 this->dataBuff.clear(); @@ -95,28 +90,29 @@ QString content = counterData->timestamp + " " + counterData->rawFrame.left(counterData->rawFrame.size() - COUNTER_FRAME_TAIL.size()); QLogUtil::writeRawDataLog(filename, content); - // 2.2 各个通道的相差数据 -// for (int i = 1; i <= phaseData->channelActive.size(); i++) -// { -// if (phaseData->channelActive.at(i-1).toUInt() == 1) -// { -// QString chFilename("%1CH_%2_%3.log"); -// chFilename = chFilename.arg(filePrefix); -// if (i < 10) -// { -// chFilename = chFilename.arg(QString("0%1").arg(i)); -// } else -// { -// chFilename = chFilename.arg(i); -// } -// chFilename = chFilename.arg(filePostfix); -// QString channelDataStr = QString("%1 %2 %3").arg(phaseData->timestamp).arg(phaseData->channelData.at(i-1)).arg(phaseData->frameId); + // 2.2 各个通道的clock diff数据 + QString chFilename("%1CH_%2_%3.log"); + chFilename = chFilename.arg(filePrefix); + if (counterData->channelId < 10) + { + chFilename = chFilename.arg(QString("0%1").arg(counterData->channelId)); + } else + { + chFilename = chFilename.arg(counterData->channelId); + } + chFilename = chFilename.arg(filePostfix); + QString channelDataStr = QString("%1 %2 %3").arg(counterData->timestamp).arg(counterData->channelClockValue).arg(counterData->frameId); -// QLogUtil::writeChannelDataLog(chFilename, channelDataStr); -// } -// } + QLogUtil::writeChannelDataLog(chFilename, channelDataStr); // 3. 输出到中间件,执行后续处理过程 + if (SettingConfig::getInstance().NEED_KAFKA == 1) + { + QJsonObject jsonObj = counterData->toJSON(); + jsonObj.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + jsonObj.insert("deviceId", devCode); + kafkaUtil.produceMessage(QString(QJsonDocument(jsonObj).toJson(QJsonDocument::Compact))); + } // 4. 在界面上简单显示相差数据结果 // emit this->sendDataToDraw(counterData); diff --git a/CounterAcq/CounterDevice.h b/CounterAcq/CounterDevice.h index 3ca84da..2e5f04e 100644 --- a/CounterAcq/CounterDevice.h +++ b/CounterAcq/CounterDevice.h @@ -4,8 +4,10 @@ #include #include "common/utils/QSerialPortUtil.h" +#include "common/utils/QKafkaUtil.h" #include "common/utils/QByteUtil.h" #include "common/utils/QLogUtil.h" +#include "common/utils/SettingConfig.h" #include "protocol/CounterProtocolBM.h" class CounterDevice : public QObject @@ -32,6 +34,7 @@ int baudRate; QSerialPortUtil serialUtil; + QKafkaUtil kafkaUtil; QByteArray dataBuff; signals: diff --git a/CounterAcq/CounterWindow.cpp b/CounterAcq/CounterWindow.cpp index 4bad032..6059e0b 100644 --- a/CounterAcq/CounterWindow.cpp +++ b/CounterAcq/CounterWindow.cpp @@ -1,6 +1,10 @@ #include "CounterWindow.h" #include "ui_CounterWindow.h" +#include +#include +#include + CounterWindow::CounterWindow(QWidget *parent) : QWidget(parent), ui(new Ui::CounterWindow) @@ -32,6 +36,9 @@ device->initSerialPort(); // device->startWork(); } + + timer = new QTimer(this); + timer->start(1000 * 10); } CounterWindow::~CounterWindow() @@ -41,5 +48,48 @@ void CounterWindow::on_pushButton_clicked() { + connect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); +} + +void CounterWindow::mockOneFrame() +{ qlonglong deviceId = 9101; + + QKafkaUtil * kafkaUtil = new QKafkaUtil(this); + kafkaUtil->setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil->setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil->createProducer(); + + QDateTime now = QDateTime::currentDateTime(); + + QString frameId = QString("%1").arg(now.toSecsSinceEpoch() % 10000); + qlonglong ts = now.toMSecsSinceEpoch(); + + for (int i = 0; i < 16; i++) { + CounterDataDto channelDataDto; + + channelDataDto.devCode = deviceId; + + channelDataDto.frameId = frameId; + channelDataDto.channelData = qrand() % 400; + channelDataDto.channelRefId = 0; + channelDataDto.channelId = i; + channelDataDto.milisecond = ts; + channelDataDto.timestamp = now.toString("yyyyMMddHHmmssZZZ"); + + QJsonObject dtoJson = channelDataDto.toJSON(); + dtoJson.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + dtoJson.insert("deviceId", deviceId); + + std::cout << QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact)).toStdString() << std::endl; + + kafkaUtil->produceMessage(QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact))); + } +} + +void CounterWindow::on_pushButton_2_clicked() +{ + disconnect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); } diff --git a/CounterAcq/CounterWindow.h b/CounterAcq/CounterWindow.h index c0f439d..c95eebc 100644 --- a/CounterAcq/CounterWindow.h +++ b/CounterAcq/CounterWindow.h @@ -3,6 +3,7 @@ #include #include "common/utils/SettingConfig.h" +#include "common/utils/QKafkaUtil.h" #include "CounterDevice.h" namespace Ui { @@ -20,8 +21,14 @@ private slots: void on_pushButton_clicked(); + void mockOneFrame(); + + void on_pushButton_2_clicked(); + private: Ui::CounterWindow *ui; + + QTimer * timer; }; #endif // COUNTERWINDOW_H diff --git a/CounterAcq/CounterDevice.cpp b/CounterAcq/CounterDevice.cpp index 3dc812b..113f83b 100644 --- a/CounterAcq/CounterDevice.cpp +++ b/CounterAcq/CounterDevice.cpp @@ -7,6 +7,10 @@ { connect(&this->serialUtil, &QSerialPortUtil::dataRecieved, this, &CounterDevice::dataReceivedHandler); + + kafkaUtil.setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil.setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil.createProducer(); } CounterDevice::~CounterDevice() @@ -74,15 +78,6 @@ void CounterDevice::afterFramePhase(CounterDataDto * counterData) { - std::cout << counterData->frameId.toStdString() << ", " << counterData->timestamp.toStdString() << ", " << counterData->milisecond << std::endl; -// for (int i = 1; i <= PHASE_MESSURE_CHANNEL; i++) -// { -// if (phaseData->channelActive.at(i-1).toInt() != 0) -// { -// std::cout << "ch: " << i << ", " << phaseData->channelData.at(i-1) << std::endl; -// } -// } - // 1. 清空dataBuff,等待下一帧的数据 this->dataBuff.clear(); @@ -95,28 +90,29 @@ QString content = counterData->timestamp + " " + counterData->rawFrame.left(counterData->rawFrame.size() - COUNTER_FRAME_TAIL.size()); QLogUtil::writeRawDataLog(filename, content); - // 2.2 各个通道的相差数据 -// for (int i = 1; i <= phaseData->channelActive.size(); i++) -// { -// if (phaseData->channelActive.at(i-1).toUInt() == 1) -// { -// QString chFilename("%1CH_%2_%3.log"); -// chFilename = chFilename.arg(filePrefix); -// if (i < 10) -// { -// chFilename = chFilename.arg(QString("0%1").arg(i)); -// } else -// { -// chFilename = chFilename.arg(i); -// } -// chFilename = chFilename.arg(filePostfix); -// QString channelDataStr = QString("%1 %2 %3").arg(phaseData->timestamp).arg(phaseData->channelData.at(i-1)).arg(phaseData->frameId); + // 2.2 各个通道的clock diff数据 + QString chFilename("%1CH_%2_%3.log"); + chFilename = chFilename.arg(filePrefix); + if (counterData->channelId < 10) + { + chFilename = chFilename.arg(QString("0%1").arg(counterData->channelId)); + } else + { + chFilename = chFilename.arg(counterData->channelId); + } + chFilename = chFilename.arg(filePostfix); + QString channelDataStr = QString("%1 %2 %3").arg(counterData->timestamp).arg(counterData->channelClockValue).arg(counterData->frameId); -// QLogUtil::writeChannelDataLog(chFilename, channelDataStr); -// } -// } + QLogUtil::writeChannelDataLog(chFilename, channelDataStr); // 3. 输出到中间件,执行后续处理过程 + if (SettingConfig::getInstance().NEED_KAFKA == 1) + { + QJsonObject jsonObj = counterData->toJSON(); + jsonObj.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + jsonObj.insert("deviceId", devCode); + kafkaUtil.produceMessage(QString(QJsonDocument(jsonObj).toJson(QJsonDocument::Compact))); + } // 4. 在界面上简单显示相差数据结果 // emit this->sendDataToDraw(counterData); diff --git a/CounterAcq/CounterDevice.h b/CounterAcq/CounterDevice.h index 3ca84da..2e5f04e 100644 --- a/CounterAcq/CounterDevice.h +++ b/CounterAcq/CounterDevice.h @@ -4,8 +4,10 @@ #include #include "common/utils/QSerialPortUtil.h" +#include "common/utils/QKafkaUtil.h" #include "common/utils/QByteUtil.h" #include "common/utils/QLogUtil.h" +#include "common/utils/SettingConfig.h" #include "protocol/CounterProtocolBM.h" class CounterDevice : public QObject @@ -32,6 +34,7 @@ int baudRate; QSerialPortUtil serialUtil; + QKafkaUtil kafkaUtil; QByteArray dataBuff; signals: diff --git a/CounterAcq/CounterWindow.cpp b/CounterAcq/CounterWindow.cpp index 4bad032..6059e0b 100644 --- a/CounterAcq/CounterWindow.cpp +++ b/CounterAcq/CounterWindow.cpp @@ -1,6 +1,10 @@ #include "CounterWindow.h" #include "ui_CounterWindow.h" +#include +#include +#include + CounterWindow::CounterWindow(QWidget *parent) : QWidget(parent), ui(new Ui::CounterWindow) @@ -32,6 +36,9 @@ device->initSerialPort(); // device->startWork(); } + + timer = new QTimer(this); + timer->start(1000 * 10); } CounterWindow::~CounterWindow() @@ -41,5 +48,48 @@ void CounterWindow::on_pushButton_clicked() { + connect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); +} + +void CounterWindow::mockOneFrame() +{ qlonglong deviceId = 9101; + + QKafkaUtil * kafkaUtil = new QKafkaUtil(this); + kafkaUtil->setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil->setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil->createProducer(); + + QDateTime now = QDateTime::currentDateTime(); + + QString frameId = QString("%1").arg(now.toSecsSinceEpoch() % 10000); + qlonglong ts = now.toMSecsSinceEpoch(); + + for (int i = 0; i < 16; i++) { + CounterDataDto channelDataDto; + + channelDataDto.devCode = deviceId; + + channelDataDto.frameId = frameId; + channelDataDto.channelData = qrand() % 400; + channelDataDto.channelRefId = 0; + channelDataDto.channelId = i; + channelDataDto.milisecond = ts; + channelDataDto.timestamp = now.toString("yyyyMMddHHmmssZZZ"); + + QJsonObject dtoJson = channelDataDto.toJSON(); + dtoJson.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + dtoJson.insert("deviceId", deviceId); + + std::cout << QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact)).toStdString() << std::endl; + + kafkaUtil->produceMessage(QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact))); + } +} + +void CounterWindow::on_pushButton_2_clicked() +{ + disconnect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); } diff --git a/CounterAcq/CounterWindow.h b/CounterAcq/CounterWindow.h index c0f439d..c95eebc 100644 --- a/CounterAcq/CounterWindow.h +++ b/CounterAcq/CounterWindow.h @@ -3,6 +3,7 @@ #include #include "common/utils/SettingConfig.h" +#include "common/utils/QKafkaUtil.h" #include "CounterDevice.h" namespace Ui { @@ -20,8 +21,14 @@ private slots: void on_pushButton_clicked(); + void mockOneFrame(); + + void on_pushButton_2_clicked(); + private: Ui::CounterWindow *ui; + + QTimer * timer; }; #endif // COUNTERWINDOW_H diff --git a/CounterAcq/CounterWindow.ui b/CounterAcq/CounterWindow.ui index 60a7bac..a0d666c 100644 --- a/CounterAcq/CounterWindow.ui +++ b/CounterAcq/CounterWindow.ui @@ -17,7 +17,7 @@ 100 - 120 + 40 181 51 @@ -26,6 +26,19 @@ Mock ClockDiff Data + + + + 100 + 120 + 181 + 41 + + + + Stop Mock + + diff --git a/CounterAcq/CounterDevice.cpp b/CounterAcq/CounterDevice.cpp index 3dc812b..113f83b 100644 --- a/CounterAcq/CounterDevice.cpp +++ b/CounterAcq/CounterDevice.cpp @@ -7,6 +7,10 @@ { connect(&this->serialUtil, &QSerialPortUtil::dataRecieved, this, &CounterDevice::dataReceivedHandler); + + kafkaUtil.setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil.setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil.createProducer(); } CounterDevice::~CounterDevice() @@ -74,15 +78,6 @@ void CounterDevice::afterFramePhase(CounterDataDto * counterData) { - std::cout << counterData->frameId.toStdString() << ", " << counterData->timestamp.toStdString() << ", " << counterData->milisecond << std::endl; -// for (int i = 1; i <= PHASE_MESSURE_CHANNEL; i++) -// { -// if (phaseData->channelActive.at(i-1).toInt() != 0) -// { -// std::cout << "ch: " << i << ", " << phaseData->channelData.at(i-1) << std::endl; -// } -// } - // 1. 清空dataBuff,等待下一帧的数据 this->dataBuff.clear(); @@ -95,28 +90,29 @@ QString content = counterData->timestamp + " " + counterData->rawFrame.left(counterData->rawFrame.size() - COUNTER_FRAME_TAIL.size()); QLogUtil::writeRawDataLog(filename, content); - // 2.2 各个通道的相差数据 -// for (int i = 1; i <= phaseData->channelActive.size(); i++) -// { -// if (phaseData->channelActive.at(i-1).toUInt() == 1) -// { -// QString chFilename("%1CH_%2_%3.log"); -// chFilename = chFilename.arg(filePrefix); -// if (i < 10) -// { -// chFilename = chFilename.arg(QString("0%1").arg(i)); -// } else -// { -// chFilename = chFilename.arg(i); -// } -// chFilename = chFilename.arg(filePostfix); -// QString channelDataStr = QString("%1 %2 %3").arg(phaseData->timestamp).arg(phaseData->channelData.at(i-1)).arg(phaseData->frameId); + // 2.2 各个通道的clock diff数据 + QString chFilename("%1CH_%2_%3.log"); + chFilename = chFilename.arg(filePrefix); + if (counterData->channelId < 10) + { + chFilename = chFilename.arg(QString("0%1").arg(counterData->channelId)); + } else + { + chFilename = chFilename.arg(counterData->channelId); + } + chFilename = chFilename.arg(filePostfix); + QString channelDataStr = QString("%1 %2 %3").arg(counterData->timestamp).arg(counterData->channelClockValue).arg(counterData->frameId); -// QLogUtil::writeChannelDataLog(chFilename, channelDataStr); -// } -// } + QLogUtil::writeChannelDataLog(chFilename, channelDataStr); // 3. 输出到中间件,执行后续处理过程 + if (SettingConfig::getInstance().NEED_KAFKA == 1) + { + QJsonObject jsonObj = counterData->toJSON(); + jsonObj.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + jsonObj.insert("deviceId", devCode); + kafkaUtil.produceMessage(QString(QJsonDocument(jsonObj).toJson(QJsonDocument::Compact))); + } // 4. 在界面上简单显示相差数据结果 // emit this->sendDataToDraw(counterData); diff --git a/CounterAcq/CounterDevice.h b/CounterAcq/CounterDevice.h index 3ca84da..2e5f04e 100644 --- a/CounterAcq/CounterDevice.h +++ b/CounterAcq/CounterDevice.h @@ -4,8 +4,10 @@ #include #include "common/utils/QSerialPortUtil.h" +#include "common/utils/QKafkaUtil.h" #include "common/utils/QByteUtil.h" #include "common/utils/QLogUtil.h" +#include "common/utils/SettingConfig.h" #include "protocol/CounterProtocolBM.h" class CounterDevice : public QObject @@ -32,6 +34,7 @@ int baudRate; QSerialPortUtil serialUtil; + QKafkaUtil kafkaUtil; QByteArray dataBuff; signals: diff --git a/CounterAcq/CounterWindow.cpp b/CounterAcq/CounterWindow.cpp index 4bad032..6059e0b 100644 --- a/CounterAcq/CounterWindow.cpp +++ b/CounterAcq/CounterWindow.cpp @@ -1,6 +1,10 @@ #include "CounterWindow.h" #include "ui_CounterWindow.h" +#include +#include +#include + CounterWindow::CounterWindow(QWidget *parent) : QWidget(parent), ui(new Ui::CounterWindow) @@ -32,6 +36,9 @@ device->initSerialPort(); // device->startWork(); } + + timer = new QTimer(this); + timer->start(1000 * 10); } CounterWindow::~CounterWindow() @@ -41,5 +48,48 @@ void CounterWindow::on_pushButton_clicked() { + connect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); +} + +void CounterWindow::mockOneFrame() +{ qlonglong deviceId = 9101; + + QKafkaUtil * kafkaUtil = new QKafkaUtil(this); + kafkaUtil->setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil->setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil->createProducer(); + + QDateTime now = QDateTime::currentDateTime(); + + QString frameId = QString("%1").arg(now.toSecsSinceEpoch() % 10000); + qlonglong ts = now.toMSecsSinceEpoch(); + + for (int i = 0; i < 16; i++) { + CounterDataDto channelDataDto; + + channelDataDto.devCode = deviceId; + + channelDataDto.frameId = frameId; + channelDataDto.channelData = qrand() % 400; + channelDataDto.channelRefId = 0; + channelDataDto.channelId = i; + channelDataDto.milisecond = ts; + channelDataDto.timestamp = now.toString("yyyyMMddHHmmssZZZ"); + + QJsonObject dtoJson = channelDataDto.toJSON(); + dtoJson.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + dtoJson.insert("deviceId", deviceId); + + std::cout << QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact)).toStdString() << std::endl; + + kafkaUtil->produceMessage(QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact))); + } +} + +void CounterWindow::on_pushButton_2_clicked() +{ + disconnect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); } diff --git a/CounterAcq/CounterWindow.h b/CounterAcq/CounterWindow.h index c0f439d..c95eebc 100644 --- a/CounterAcq/CounterWindow.h +++ b/CounterAcq/CounterWindow.h @@ -3,6 +3,7 @@ #include #include "common/utils/SettingConfig.h" +#include "common/utils/QKafkaUtil.h" #include "CounterDevice.h" namespace Ui { @@ -20,8 +21,14 @@ private slots: void on_pushButton_clicked(); + void mockOneFrame(); + + void on_pushButton_2_clicked(); + private: Ui::CounterWindow *ui; + + QTimer * timer; }; #endif // COUNTERWINDOW_H diff --git a/CounterAcq/CounterWindow.ui b/CounterAcq/CounterWindow.ui index 60a7bac..a0d666c 100644 --- a/CounterAcq/CounterWindow.ui +++ b/CounterAcq/CounterWindow.ui @@ -17,7 +17,7 @@ 100 - 120 + 40 181 51 @@ -26,6 +26,19 @@ Mock ClockDiff Data + + + + 100 + 120 + 181 + 41 + + + + Stop Mock + + diff --git a/CounterAcq/common/utils/QSerialPortUtil.cpp b/CounterAcq/common/utils/QSerialPortUtil.cpp index 2d54019..3870199 100644 --- a/CounterAcq/common/utils/QSerialPortUtil.cpp +++ b/CounterAcq/common/utils/QSerialPortUtil.cpp @@ -1,6 +1,8 @@ #include "QSerialPortUtil.h" #include +#include +#include QSerialPortUtil::QSerialPortUtil(QObject *parent) : QObject(parent) { @@ -18,12 +20,18 @@ open = serial.open(QIODevice::ReadWrite); - if (open == true) - { +// if (open == true) +// { // 绑定信号与槽 connect(&serial, &QSerialPort::readyRead, this, &QSerialPortUtil::readData); - } + + // mock data received per second + QTimer * timer = new QTimer(this); + connect(timer, &QTimer::timeout, + this, &QSerialPortUtil::mockReceivData); + timer->start(1000 * 5); +// } } void QSerialPortUtil::sendData(QByteArray data) @@ -46,3 +54,25 @@ { return this->open; } + +void QSerialPortUtil::mockReceivData() +{ + QDateTime now = QDateTime::currentDateTime(); + for (int i = 1; i <= 8; i++) + { + QByteArray buffer; + + QString channel = QString("%1").arg(i); + QString channelRef = "1"; + QString dataValue = QString("%1").arg(qrand() % 400); + QString level = QString("%1").arg(qrand() % 4 / (double) 10); + QString frameId = QString("%1").arg(now.toSecsSinceEpoch() % 10000); + + buffer.append("$GL,") + .append("0,").append(channel + ",").append("1,").append(channelRef + ",") + .append(dataValue + ",").append("1,").append(level + ",").append(frameId + "*") + .append("00").append("\r\n"); + + emit dataRecieved(buffer); + } +} diff --git a/CounterAcq/CounterDevice.cpp b/CounterAcq/CounterDevice.cpp index 3dc812b..113f83b 100644 --- a/CounterAcq/CounterDevice.cpp +++ b/CounterAcq/CounterDevice.cpp @@ -7,6 +7,10 @@ { connect(&this->serialUtil, &QSerialPortUtil::dataRecieved, this, &CounterDevice::dataReceivedHandler); + + kafkaUtil.setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil.setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil.createProducer(); } CounterDevice::~CounterDevice() @@ -74,15 +78,6 @@ void CounterDevice::afterFramePhase(CounterDataDto * counterData) { - std::cout << counterData->frameId.toStdString() << ", " << counterData->timestamp.toStdString() << ", " << counterData->milisecond << std::endl; -// for (int i = 1; i <= PHASE_MESSURE_CHANNEL; i++) -// { -// if (phaseData->channelActive.at(i-1).toInt() != 0) -// { -// std::cout << "ch: " << i << ", " << phaseData->channelData.at(i-1) << std::endl; -// } -// } - // 1. 清空dataBuff,等待下一帧的数据 this->dataBuff.clear(); @@ -95,28 +90,29 @@ QString content = counterData->timestamp + " " + counterData->rawFrame.left(counterData->rawFrame.size() - COUNTER_FRAME_TAIL.size()); QLogUtil::writeRawDataLog(filename, content); - // 2.2 各个通道的相差数据 -// for (int i = 1; i <= phaseData->channelActive.size(); i++) -// { -// if (phaseData->channelActive.at(i-1).toUInt() == 1) -// { -// QString chFilename("%1CH_%2_%3.log"); -// chFilename = chFilename.arg(filePrefix); -// if (i < 10) -// { -// chFilename = chFilename.arg(QString("0%1").arg(i)); -// } else -// { -// chFilename = chFilename.arg(i); -// } -// chFilename = chFilename.arg(filePostfix); -// QString channelDataStr = QString("%1 %2 %3").arg(phaseData->timestamp).arg(phaseData->channelData.at(i-1)).arg(phaseData->frameId); + // 2.2 各个通道的clock diff数据 + QString chFilename("%1CH_%2_%3.log"); + chFilename = chFilename.arg(filePrefix); + if (counterData->channelId < 10) + { + chFilename = chFilename.arg(QString("0%1").arg(counterData->channelId)); + } else + { + chFilename = chFilename.arg(counterData->channelId); + } + chFilename = chFilename.arg(filePostfix); + QString channelDataStr = QString("%1 %2 %3").arg(counterData->timestamp).arg(counterData->channelClockValue).arg(counterData->frameId); -// QLogUtil::writeChannelDataLog(chFilename, channelDataStr); -// } -// } + QLogUtil::writeChannelDataLog(chFilename, channelDataStr); // 3. 输出到中间件,执行后续处理过程 + if (SettingConfig::getInstance().NEED_KAFKA == 1) + { + QJsonObject jsonObj = counterData->toJSON(); + jsonObj.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + jsonObj.insert("deviceId", devCode); + kafkaUtil.produceMessage(QString(QJsonDocument(jsonObj).toJson(QJsonDocument::Compact))); + } // 4. 在界面上简单显示相差数据结果 // emit this->sendDataToDraw(counterData); diff --git a/CounterAcq/CounterDevice.h b/CounterAcq/CounterDevice.h index 3ca84da..2e5f04e 100644 --- a/CounterAcq/CounterDevice.h +++ b/CounterAcq/CounterDevice.h @@ -4,8 +4,10 @@ #include #include "common/utils/QSerialPortUtil.h" +#include "common/utils/QKafkaUtil.h" #include "common/utils/QByteUtil.h" #include "common/utils/QLogUtil.h" +#include "common/utils/SettingConfig.h" #include "protocol/CounterProtocolBM.h" class CounterDevice : public QObject @@ -32,6 +34,7 @@ int baudRate; QSerialPortUtil serialUtil; + QKafkaUtil kafkaUtil; QByteArray dataBuff; signals: diff --git a/CounterAcq/CounterWindow.cpp b/CounterAcq/CounterWindow.cpp index 4bad032..6059e0b 100644 --- a/CounterAcq/CounterWindow.cpp +++ b/CounterAcq/CounterWindow.cpp @@ -1,6 +1,10 @@ #include "CounterWindow.h" #include "ui_CounterWindow.h" +#include +#include +#include + CounterWindow::CounterWindow(QWidget *parent) : QWidget(parent), ui(new Ui::CounterWindow) @@ -32,6 +36,9 @@ device->initSerialPort(); // device->startWork(); } + + timer = new QTimer(this); + timer->start(1000 * 10); } CounterWindow::~CounterWindow() @@ -41,5 +48,48 @@ void CounterWindow::on_pushButton_clicked() { + connect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); +} + +void CounterWindow::mockOneFrame() +{ qlonglong deviceId = 9101; + + QKafkaUtil * kafkaUtil = new QKafkaUtil(this); + kafkaUtil->setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil->setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil->createProducer(); + + QDateTime now = QDateTime::currentDateTime(); + + QString frameId = QString("%1").arg(now.toSecsSinceEpoch() % 10000); + qlonglong ts = now.toMSecsSinceEpoch(); + + for (int i = 0; i < 16; i++) { + CounterDataDto channelDataDto; + + channelDataDto.devCode = deviceId; + + channelDataDto.frameId = frameId; + channelDataDto.channelData = qrand() % 400; + channelDataDto.channelRefId = 0; + channelDataDto.channelId = i; + channelDataDto.milisecond = ts; + channelDataDto.timestamp = now.toString("yyyyMMddHHmmssZZZ"); + + QJsonObject dtoJson = channelDataDto.toJSON(); + dtoJson.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + dtoJson.insert("deviceId", deviceId); + + std::cout << QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact)).toStdString() << std::endl; + + kafkaUtil->produceMessage(QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact))); + } +} + +void CounterWindow::on_pushButton_2_clicked() +{ + disconnect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); } diff --git a/CounterAcq/CounterWindow.h b/CounterAcq/CounterWindow.h index c0f439d..c95eebc 100644 --- a/CounterAcq/CounterWindow.h +++ b/CounterAcq/CounterWindow.h @@ -3,6 +3,7 @@ #include #include "common/utils/SettingConfig.h" +#include "common/utils/QKafkaUtil.h" #include "CounterDevice.h" namespace Ui { @@ -20,8 +21,14 @@ private slots: void on_pushButton_clicked(); + void mockOneFrame(); + + void on_pushButton_2_clicked(); + private: Ui::CounterWindow *ui; + + QTimer * timer; }; #endif // COUNTERWINDOW_H diff --git a/CounterAcq/CounterWindow.ui b/CounterAcq/CounterWindow.ui index 60a7bac..a0d666c 100644 --- a/CounterAcq/CounterWindow.ui +++ b/CounterAcq/CounterWindow.ui @@ -17,7 +17,7 @@ 100 - 120 + 40 181 51 @@ -26,6 +26,19 @@ Mock ClockDiff Data + + + + 100 + 120 + 181 + 41 + + + + Stop Mock + + diff --git a/CounterAcq/common/utils/QSerialPortUtil.cpp b/CounterAcq/common/utils/QSerialPortUtil.cpp index 2d54019..3870199 100644 --- a/CounterAcq/common/utils/QSerialPortUtil.cpp +++ b/CounterAcq/common/utils/QSerialPortUtil.cpp @@ -1,6 +1,8 @@ #include "QSerialPortUtil.h" #include +#include +#include QSerialPortUtil::QSerialPortUtil(QObject *parent) : QObject(parent) { @@ -18,12 +20,18 @@ open = serial.open(QIODevice::ReadWrite); - if (open == true) - { +// if (open == true) +// { // 绑定信号与槽 connect(&serial, &QSerialPort::readyRead, this, &QSerialPortUtil::readData); - } + + // mock data received per second + QTimer * timer = new QTimer(this); + connect(timer, &QTimer::timeout, + this, &QSerialPortUtil::mockReceivData); + timer->start(1000 * 5); +// } } void QSerialPortUtil::sendData(QByteArray data) @@ -46,3 +54,25 @@ { return this->open; } + +void QSerialPortUtil::mockReceivData() +{ + QDateTime now = QDateTime::currentDateTime(); + for (int i = 1; i <= 8; i++) + { + QByteArray buffer; + + QString channel = QString("%1").arg(i); + QString channelRef = "1"; + QString dataValue = QString("%1").arg(qrand() % 400); + QString level = QString("%1").arg(qrand() % 4 / (double) 10); + QString frameId = QString("%1").arg(now.toSecsSinceEpoch() % 10000); + + buffer.append("$GL,") + .append("0,").append(channel + ",").append("1,").append(channelRef + ",") + .append(dataValue + ",").append("1,").append(level + ",").append(frameId + "*") + .append("00").append("\r\n"); + + emit dataRecieved(buffer); + } +} diff --git a/CounterAcq/common/utils/QSerialPortUtil.h b/CounterAcq/common/utils/QSerialPortUtil.h index 1a7f62f..accf403 100644 --- a/CounterAcq/common/utils/QSerialPortUtil.h +++ b/CounterAcq/common/utils/QSerialPortUtil.h @@ -21,6 +21,8 @@ bool open; + void mockReceivData(); + signals: void dataRecieved(QByteArray data); // 收到数据的信号 }; diff --git a/CounterAcq/CounterDevice.cpp b/CounterAcq/CounterDevice.cpp index 3dc812b..113f83b 100644 --- a/CounterAcq/CounterDevice.cpp +++ b/CounterAcq/CounterDevice.cpp @@ -7,6 +7,10 @@ { connect(&this->serialUtil, &QSerialPortUtil::dataRecieved, this, &CounterDevice::dataReceivedHandler); + + kafkaUtil.setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil.setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil.createProducer(); } CounterDevice::~CounterDevice() @@ -74,15 +78,6 @@ void CounterDevice::afterFramePhase(CounterDataDto * counterData) { - std::cout << counterData->frameId.toStdString() << ", " << counterData->timestamp.toStdString() << ", " << counterData->milisecond << std::endl; -// for (int i = 1; i <= PHASE_MESSURE_CHANNEL; i++) -// { -// if (phaseData->channelActive.at(i-1).toInt() != 0) -// { -// std::cout << "ch: " << i << ", " << phaseData->channelData.at(i-1) << std::endl; -// } -// } - // 1. 清空dataBuff,等待下一帧的数据 this->dataBuff.clear(); @@ -95,28 +90,29 @@ QString content = counterData->timestamp + " " + counterData->rawFrame.left(counterData->rawFrame.size() - COUNTER_FRAME_TAIL.size()); QLogUtil::writeRawDataLog(filename, content); - // 2.2 各个通道的相差数据 -// for (int i = 1; i <= phaseData->channelActive.size(); i++) -// { -// if (phaseData->channelActive.at(i-1).toUInt() == 1) -// { -// QString chFilename("%1CH_%2_%3.log"); -// chFilename = chFilename.arg(filePrefix); -// if (i < 10) -// { -// chFilename = chFilename.arg(QString("0%1").arg(i)); -// } else -// { -// chFilename = chFilename.arg(i); -// } -// chFilename = chFilename.arg(filePostfix); -// QString channelDataStr = QString("%1 %2 %3").arg(phaseData->timestamp).arg(phaseData->channelData.at(i-1)).arg(phaseData->frameId); + // 2.2 各个通道的clock diff数据 + QString chFilename("%1CH_%2_%3.log"); + chFilename = chFilename.arg(filePrefix); + if (counterData->channelId < 10) + { + chFilename = chFilename.arg(QString("0%1").arg(counterData->channelId)); + } else + { + chFilename = chFilename.arg(counterData->channelId); + } + chFilename = chFilename.arg(filePostfix); + QString channelDataStr = QString("%1 %2 %3").arg(counterData->timestamp).arg(counterData->channelClockValue).arg(counterData->frameId); -// QLogUtil::writeChannelDataLog(chFilename, channelDataStr); -// } -// } + QLogUtil::writeChannelDataLog(chFilename, channelDataStr); // 3. 输出到中间件,执行后续处理过程 + if (SettingConfig::getInstance().NEED_KAFKA == 1) + { + QJsonObject jsonObj = counterData->toJSON(); + jsonObj.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + jsonObj.insert("deviceId", devCode); + kafkaUtil.produceMessage(QString(QJsonDocument(jsonObj).toJson(QJsonDocument::Compact))); + } // 4. 在界面上简单显示相差数据结果 // emit this->sendDataToDraw(counterData); diff --git a/CounterAcq/CounterDevice.h b/CounterAcq/CounterDevice.h index 3ca84da..2e5f04e 100644 --- a/CounterAcq/CounterDevice.h +++ b/CounterAcq/CounterDevice.h @@ -4,8 +4,10 @@ #include #include "common/utils/QSerialPortUtil.h" +#include "common/utils/QKafkaUtil.h" #include "common/utils/QByteUtil.h" #include "common/utils/QLogUtil.h" +#include "common/utils/SettingConfig.h" #include "protocol/CounterProtocolBM.h" class CounterDevice : public QObject @@ -32,6 +34,7 @@ int baudRate; QSerialPortUtil serialUtil; + QKafkaUtil kafkaUtil; QByteArray dataBuff; signals: diff --git a/CounterAcq/CounterWindow.cpp b/CounterAcq/CounterWindow.cpp index 4bad032..6059e0b 100644 --- a/CounterAcq/CounterWindow.cpp +++ b/CounterAcq/CounterWindow.cpp @@ -1,6 +1,10 @@ #include "CounterWindow.h" #include "ui_CounterWindow.h" +#include +#include +#include + CounterWindow::CounterWindow(QWidget *parent) : QWidget(parent), ui(new Ui::CounterWindow) @@ -32,6 +36,9 @@ device->initSerialPort(); // device->startWork(); } + + timer = new QTimer(this); + timer->start(1000 * 10); } CounterWindow::~CounterWindow() @@ -41,5 +48,48 @@ void CounterWindow::on_pushButton_clicked() { + connect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); +} + +void CounterWindow::mockOneFrame() +{ qlonglong deviceId = 9101; + + QKafkaUtil * kafkaUtil = new QKafkaUtil(this); + kafkaUtil->setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil->setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil->createProducer(); + + QDateTime now = QDateTime::currentDateTime(); + + QString frameId = QString("%1").arg(now.toSecsSinceEpoch() % 10000); + qlonglong ts = now.toMSecsSinceEpoch(); + + for (int i = 0; i < 16; i++) { + CounterDataDto channelDataDto; + + channelDataDto.devCode = deviceId; + + channelDataDto.frameId = frameId; + channelDataDto.channelData = qrand() % 400; + channelDataDto.channelRefId = 0; + channelDataDto.channelId = i; + channelDataDto.milisecond = ts; + channelDataDto.timestamp = now.toString("yyyyMMddHHmmssZZZ"); + + QJsonObject dtoJson = channelDataDto.toJSON(); + dtoJson.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + dtoJson.insert("deviceId", deviceId); + + std::cout << QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact)).toStdString() << std::endl; + + kafkaUtil->produceMessage(QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact))); + } +} + +void CounterWindow::on_pushButton_2_clicked() +{ + disconnect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); } diff --git a/CounterAcq/CounterWindow.h b/CounterAcq/CounterWindow.h index c0f439d..c95eebc 100644 --- a/CounterAcq/CounterWindow.h +++ b/CounterAcq/CounterWindow.h @@ -3,6 +3,7 @@ #include #include "common/utils/SettingConfig.h" +#include "common/utils/QKafkaUtil.h" #include "CounterDevice.h" namespace Ui { @@ -20,8 +21,14 @@ private slots: void on_pushButton_clicked(); + void mockOneFrame(); + + void on_pushButton_2_clicked(); + private: Ui::CounterWindow *ui; + + QTimer * timer; }; #endif // COUNTERWINDOW_H diff --git a/CounterAcq/CounterWindow.ui b/CounterAcq/CounterWindow.ui index 60a7bac..a0d666c 100644 --- a/CounterAcq/CounterWindow.ui +++ b/CounterAcq/CounterWindow.ui @@ -17,7 +17,7 @@ 100 - 120 + 40 181 51 @@ -26,6 +26,19 @@ Mock ClockDiff Data + + + + 100 + 120 + 181 + 41 + + + + Stop Mock + + diff --git a/CounterAcq/common/utils/QSerialPortUtil.cpp b/CounterAcq/common/utils/QSerialPortUtil.cpp index 2d54019..3870199 100644 --- a/CounterAcq/common/utils/QSerialPortUtil.cpp +++ b/CounterAcq/common/utils/QSerialPortUtil.cpp @@ -1,6 +1,8 @@ #include "QSerialPortUtil.h" #include +#include +#include QSerialPortUtil::QSerialPortUtil(QObject *parent) : QObject(parent) { @@ -18,12 +20,18 @@ open = serial.open(QIODevice::ReadWrite); - if (open == true) - { +// if (open == true) +// { // 绑定信号与槽 connect(&serial, &QSerialPort::readyRead, this, &QSerialPortUtil::readData); - } + + // mock data received per second + QTimer * timer = new QTimer(this); + connect(timer, &QTimer::timeout, + this, &QSerialPortUtil::mockReceivData); + timer->start(1000 * 5); +// } } void QSerialPortUtil::sendData(QByteArray data) @@ -46,3 +54,25 @@ { return this->open; } + +void QSerialPortUtil::mockReceivData() +{ + QDateTime now = QDateTime::currentDateTime(); + for (int i = 1; i <= 8; i++) + { + QByteArray buffer; + + QString channel = QString("%1").arg(i); + QString channelRef = "1"; + QString dataValue = QString("%1").arg(qrand() % 400); + QString level = QString("%1").arg(qrand() % 4 / (double) 10); + QString frameId = QString("%1").arg(now.toSecsSinceEpoch() % 10000); + + buffer.append("$GL,") + .append("0,").append(channel + ",").append("1,").append(channelRef + ",") + .append(dataValue + ",").append("1,").append(level + ",").append(frameId + "*") + .append("00").append("\r\n"); + + emit dataRecieved(buffer); + } +} diff --git a/CounterAcq/common/utils/QSerialPortUtil.h b/CounterAcq/common/utils/QSerialPortUtil.h index 1a7f62f..accf403 100644 --- a/CounterAcq/common/utils/QSerialPortUtil.h +++ b/CounterAcq/common/utils/QSerialPortUtil.h @@ -21,6 +21,8 @@ bool open; + void mockReceivData(); + signals: void dataRecieved(QByteArray data); // 收到数据的信号 }; diff --git a/CounterAcq/common/utils/SettingConfig.cpp b/CounterAcq/common/utils/SettingConfig.cpp index 0c9d89d..67a8dd0 100644 --- a/CounterAcq/common/utils/SettingConfig.cpp +++ b/CounterAcq/common/utils/SettingConfig.cpp @@ -9,6 +9,7 @@ BAUD_RATE = getProperty("com", "baudRate").toUInt(); DEV_CODES = getProperty("com", "devCodes").toString(); + NEED_KAFKA = getProperty("kafka", "needKafka").toUInt(); KAFKA_BROKERS = getProperty("kafka", "brokers").toString(); KAFKA_DATA_TOPIC = getProperty("kafka", "dataTopic").toString(); diff --git a/CounterAcq/CounterDevice.cpp b/CounterAcq/CounterDevice.cpp index 3dc812b..113f83b 100644 --- a/CounterAcq/CounterDevice.cpp +++ b/CounterAcq/CounterDevice.cpp @@ -7,6 +7,10 @@ { connect(&this->serialUtil, &QSerialPortUtil::dataRecieved, this, &CounterDevice::dataReceivedHandler); + + kafkaUtil.setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil.setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil.createProducer(); } CounterDevice::~CounterDevice() @@ -74,15 +78,6 @@ void CounterDevice::afterFramePhase(CounterDataDto * counterData) { - std::cout << counterData->frameId.toStdString() << ", " << counterData->timestamp.toStdString() << ", " << counterData->milisecond << std::endl; -// for (int i = 1; i <= PHASE_MESSURE_CHANNEL; i++) -// { -// if (phaseData->channelActive.at(i-1).toInt() != 0) -// { -// std::cout << "ch: " << i << ", " << phaseData->channelData.at(i-1) << std::endl; -// } -// } - // 1. 清空dataBuff,等待下一帧的数据 this->dataBuff.clear(); @@ -95,28 +90,29 @@ QString content = counterData->timestamp + " " + counterData->rawFrame.left(counterData->rawFrame.size() - COUNTER_FRAME_TAIL.size()); QLogUtil::writeRawDataLog(filename, content); - // 2.2 各个通道的相差数据 -// for (int i = 1; i <= phaseData->channelActive.size(); i++) -// { -// if (phaseData->channelActive.at(i-1).toUInt() == 1) -// { -// QString chFilename("%1CH_%2_%3.log"); -// chFilename = chFilename.arg(filePrefix); -// if (i < 10) -// { -// chFilename = chFilename.arg(QString("0%1").arg(i)); -// } else -// { -// chFilename = chFilename.arg(i); -// } -// chFilename = chFilename.arg(filePostfix); -// QString channelDataStr = QString("%1 %2 %3").arg(phaseData->timestamp).arg(phaseData->channelData.at(i-1)).arg(phaseData->frameId); + // 2.2 各个通道的clock diff数据 + QString chFilename("%1CH_%2_%3.log"); + chFilename = chFilename.arg(filePrefix); + if (counterData->channelId < 10) + { + chFilename = chFilename.arg(QString("0%1").arg(counterData->channelId)); + } else + { + chFilename = chFilename.arg(counterData->channelId); + } + chFilename = chFilename.arg(filePostfix); + QString channelDataStr = QString("%1 %2 %3").arg(counterData->timestamp).arg(counterData->channelClockValue).arg(counterData->frameId); -// QLogUtil::writeChannelDataLog(chFilename, channelDataStr); -// } -// } + QLogUtil::writeChannelDataLog(chFilename, channelDataStr); // 3. 输出到中间件,执行后续处理过程 + if (SettingConfig::getInstance().NEED_KAFKA == 1) + { + QJsonObject jsonObj = counterData->toJSON(); + jsonObj.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + jsonObj.insert("deviceId", devCode); + kafkaUtil.produceMessage(QString(QJsonDocument(jsonObj).toJson(QJsonDocument::Compact))); + } // 4. 在界面上简单显示相差数据结果 // emit this->sendDataToDraw(counterData); diff --git a/CounterAcq/CounterDevice.h b/CounterAcq/CounterDevice.h index 3ca84da..2e5f04e 100644 --- a/CounterAcq/CounterDevice.h +++ b/CounterAcq/CounterDevice.h @@ -4,8 +4,10 @@ #include #include "common/utils/QSerialPortUtil.h" +#include "common/utils/QKafkaUtil.h" #include "common/utils/QByteUtil.h" #include "common/utils/QLogUtil.h" +#include "common/utils/SettingConfig.h" #include "protocol/CounterProtocolBM.h" class CounterDevice : public QObject @@ -32,6 +34,7 @@ int baudRate; QSerialPortUtil serialUtil; + QKafkaUtil kafkaUtil; QByteArray dataBuff; signals: diff --git a/CounterAcq/CounterWindow.cpp b/CounterAcq/CounterWindow.cpp index 4bad032..6059e0b 100644 --- a/CounterAcq/CounterWindow.cpp +++ b/CounterAcq/CounterWindow.cpp @@ -1,6 +1,10 @@ #include "CounterWindow.h" #include "ui_CounterWindow.h" +#include +#include +#include + CounterWindow::CounterWindow(QWidget *parent) : QWidget(parent), ui(new Ui::CounterWindow) @@ -32,6 +36,9 @@ device->initSerialPort(); // device->startWork(); } + + timer = new QTimer(this); + timer->start(1000 * 10); } CounterWindow::~CounterWindow() @@ -41,5 +48,48 @@ void CounterWindow::on_pushButton_clicked() { + connect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); +} + +void CounterWindow::mockOneFrame() +{ qlonglong deviceId = 9101; + + QKafkaUtil * kafkaUtil = new QKafkaUtil(this); + kafkaUtil->setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil->setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil->createProducer(); + + QDateTime now = QDateTime::currentDateTime(); + + QString frameId = QString("%1").arg(now.toSecsSinceEpoch() % 10000); + qlonglong ts = now.toMSecsSinceEpoch(); + + for (int i = 0; i < 16; i++) { + CounterDataDto channelDataDto; + + channelDataDto.devCode = deviceId; + + channelDataDto.frameId = frameId; + channelDataDto.channelData = qrand() % 400; + channelDataDto.channelRefId = 0; + channelDataDto.channelId = i; + channelDataDto.milisecond = ts; + channelDataDto.timestamp = now.toString("yyyyMMddHHmmssZZZ"); + + QJsonObject dtoJson = channelDataDto.toJSON(); + dtoJson.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + dtoJson.insert("deviceId", deviceId); + + std::cout << QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact)).toStdString() << std::endl; + + kafkaUtil->produceMessage(QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact))); + } +} + +void CounterWindow::on_pushButton_2_clicked() +{ + disconnect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); } diff --git a/CounterAcq/CounterWindow.h b/CounterAcq/CounterWindow.h index c0f439d..c95eebc 100644 --- a/CounterAcq/CounterWindow.h +++ b/CounterAcq/CounterWindow.h @@ -3,6 +3,7 @@ #include #include "common/utils/SettingConfig.h" +#include "common/utils/QKafkaUtil.h" #include "CounterDevice.h" namespace Ui { @@ -20,8 +21,14 @@ private slots: void on_pushButton_clicked(); + void mockOneFrame(); + + void on_pushButton_2_clicked(); + private: Ui::CounterWindow *ui; + + QTimer * timer; }; #endif // COUNTERWINDOW_H diff --git a/CounterAcq/CounterWindow.ui b/CounterAcq/CounterWindow.ui index 60a7bac..a0d666c 100644 --- a/CounterAcq/CounterWindow.ui +++ b/CounterAcq/CounterWindow.ui @@ -17,7 +17,7 @@ 100 - 120 + 40 181 51 @@ -26,6 +26,19 @@ Mock ClockDiff Data + + + + 100 + 120 + 181 + 41 + + + + Stop Mock + + diff --git a/CounterAcq/common/utils/QSerialPortUtil.cpp b/CounterAcq/common/utils/QSerialPortUtil.cpp index 2d54019..3870199 100644 --- a/CounterAcq/common/utils/QSerialPortUtil.cpp +++ b/CounterAcq/common/utils/QSerialPortUtil.cpp @@ -1,6 +1,8 @@ #include "QSerialPortUtil.h" #include +#include +#include QSerialPortUtil::QSerialPortUtil(QObject *parent) : QObject(parent) { @@ -18,12 +20,18 @@ open = serial.open(QIODevice::ReadWrite); - if (open == true) - { +// if (open == true) +// { // 绑定信号与槽 connect(&serial, &QSerialPort::readyRead, this, &QSerialPortUtil::readData); - } + + // mock data received per second + QTimer * timer = new QTimer(this); + connect(timer, &QTimer::timeout, + this, &QSerialPortUtil::mockReceivData); + timer->start(1000 * 5); +// } } void QSerialPortUtil::sendData(QByteArray data) @@ -46,3 +54,25 @@ { return this->open; } + +void QSerialPortUtil::mockReceivData() +{ + QDateTime now = QDateTime::currentDateTime(); + for (int i = 1; i <= 8; i++) + { + QByteArray buffer; + + QString channel = QString("%1").arg(i); + QString channelRef = "1"; + QString dataValue = QString("%1").arg(qrand() % 400); + QString level = QString("%1").arg(qrand() % 4 / (double) 10); + QString frameId = QString("%1").arg(now.toSecsSinceEpoch() % 10000); + + buffer.append("$GL,") + .append("0,").append(channel + ",").append("1,").append(channelRef + ",") + .append(dataValue + ",").append("1,").append(level + ",").append(frameId + "*") + .append("00").append("\r\n"); + + emit dataRecieved(buffer); + } +} diff --git a/CounterAcq/common/utils/QSerialPortUtil.h b/CounterAcq/common/utils/QSerialPortUtil.h index 1a7f62f..accf403 100644 --- a/CounterAcq/common/utils/QSerialPortUtil.h +++ b/CounterAcq/common/utils/QSerialPortUtil.h @@ -21,6 +21,8 @@ bool open; + void mockReceivData(); + signals: void dataRecieved(QByteArray data); // 收到数据的信号 }; diff --git a/CounterAcq/common/utils/SettingConfig.cpp b/CounterAcq/common/utils/SettingConfig.cpp index 0c9d89d..67a8dd0 100644 --- a/CounterAcq/common/utils/SettingConfig.cpp +++ b/CounterAcq/common/utils/SettingConfig.cpp @@ -9,6 +9,7 @@ BAUD_RATE = getProperty("com", "baudRate").toUInt(); DEV_CODES = getProperty("com", "devCodes").toString(); + NEED_KAFKA = getProperty("kafka", "needKafka").toUInt(); KAFKA_BROKERS = getProperty("kafka", "brokers").toString(); KAFKA_DATA_TOPIC = getProperty("kafka", "dataTopic").toString(); diff --git a/CounterAcq/common/utils/SettingConfig.h b/CounterAcq/common/utils/SettingConfig.h index 8f752a0..2d10a4d 100644 --- a/CounterAcq/common/utils/SettingConfig.h +++ b/CounterAcq/common/utils/SettingConfig.h @@ -31,6 +31,7 @@ int BAUD_RATE; QString DEV_CODES; + int NEED_KAFKA; QString KAFKA_BROKERS; QString KAFKA_DATA_TOPIC; diff --git a/CounterAcq/CounterDevice.cpp b/CounterAcq/CounterDevice.cpp index 3dc812b..113f83b 100644 --- a/CounterAcq/CounterDevice.cpp +++ b/CounterAcq/CounterDevice.cpp @@ -7,6 +7,10 @@ { connect(&this->serialUtil, &QSerialPortUtil::dataRecieved, this, &CounterDevice::dataReceivedHandler); + + kafkaUtil.setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil.setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil.createProducer(); } CounterDevice::~CounterDevice() @@ -74,15 +78,6 @@ void CounterDevice::afterFramePhase(CounterDataDto * counterData) { - std::cout << counterData->frameId.toStdString() << ", " << counterData->timestamp.toStdString() << ", " << counterData->milisecond << std::endl; -// for (int i = 1; i <= PHASE_MESSURE_CHANNEL; i++) -// { -// if (phaseData->channelActive.at(i-1).toInt() != 0) -// { -// std::cout << "ch: " << i << ", " << phaseData->channelData.at(i-1) << std::endl; -// } -// } - // 1. 清空dataBuff,等待下一帧的数据 this->dataBuff.clear(); @@ -95,28 +90,29 @@ QString content = counterData->timestamp + " " + counterData->rawFrame.left(counterData->rawFrame.size() - COUNTER_FRAME_TAIL.size()); QLogUtil::writeRawDataLog(filename, content); - // 2.2 各个通道的相差数据 -// for (int i = 1; i <= phaseData->channelActive.size(); i++) -// { -// if (phaseData->channelActive.at(i-1).toUInt() == 1) -// { -// QString chFilename("%1CH_%2_%3.log"); -// chFilename = chFilename.arg(filePrefix); -// if (i < 10) -// { -// chFilename = chFilename.arg(QString("0%1").arg(i)); -// } else -// { -// chFilename = chFilename.arg(i); -// } -// chFilename = chFilename.arg(filePostfix); -// QString channelDataStr = QString("%1 %2 %3").arg(phaseData->timestamp).arg(phaseData->channelData.at(i-1)).arg(phaseData->frameId); + // 2.2 各个通道的clock diff数据 + QString chFilename("%1CH_%2_%3.log"); + chFilename = chFilename.arg(filePrefix); + if (counterData->channelId < 10) + { + chFilename = chFilename.arg(QString("0%1").arg(counterData->channelId)); + } else + { + chFilename = chFilename.arg(counterData->channelId); + } + chFilename = chFilename.arg(filePostfix); + QString channelDataStr = QString("%1 %2 %3").arg(counterData->timestamp).arg(counterData->channelClockValue).arg(counterData->frameId); -// QLogUtil::writeChannelDataLog(chFilename, channelDataStr); -// } -// } + QLogUtil::writeChannelDataLog(chFilename, channelDataStr); // 3. 输出到中间件,执行后续处理过程 + if (SettingConfig::getInstance().NEED_KAFKA == 1) + { + QJsonObject jsonObj = counterData->toJSON(); + jsonObj.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + jsonObj.insert("deviceId", devCode); + kafkaUtil.produceMessage(QString(QJsonDocument(jsonObj).toJson(QJsonDocument::Compact))); + } // 4. 在界面上简单显示相差数据结果 // emit this->sendDataToDraw(counterData); diff --git a/CounterAcq/CounterDevice.h b/CounterAcq/CounterDevice.h index 3ca84da..2e5f04e 100644 --- a/CounterAcq/CounterDevice.h +++ b/CounterAcq/CounterDevice.h @@ -4,8 +4,10 @@ #include #include "common/utils/QSerialPortUtil.h" +#include "common/utils/QKafkaUtil.h" #include "common/utils/QByteUtil.h" #include "common/utils/QLogUtil.h" +#include "common/utils/SettingConfig.h" #include "protocol/CounterProtocolBM.h" class CounterDevice : public QObject @@ -32,6 +34,7 @@ int baudRate; QSerialPortUtil serialUtil; + QKafkaUtil kafkaUtil; QByteArray dataBuff; signals: diff --git a/CounterAcq/CounterWindow.cpp b/CounterAcq/CounterWindow.cpp index 4bad032..6059e0b 100644 --- a/CounterAcq/CounterWindow.cpp +++ b/CounterAcq/CounterWindow.cpp @@ -1,6 +1,10 @@ #include "CounterWindow.h" #include "ui_CounterWindow.h" +#include +#include +#include + CounterWindow::CounterWindow(QWidget *parent) : QWidget(parent), ui(new Ui::CounterWindow) @@ -32,6 +36,9 @@ device->initSerialPort(); // device->startWork(); } + + timer = new QTimer(this); + timer->start(1000 * 10); } CounterWindow::~CounterWindow() @@ -41,5 +48,48 @@ void CounterWindow::on_pushButton_clicked() { + connect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); +} + +void CounterWindow::mockOneFrame() +{ qlonglong deviceId = 9101; + + QKafkaUtil * kafkaUtil = new QKafkaUtil(this); + kafkaUtil->setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil->setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil->createProducer(); + + QDateTime now = QDateTime::currentDateTime(); + + QString frameId = QString("%1").arg(now.toSecsSinceEpoch() % 10000); + qlonglong ts = now.toMSecsSinceEpoch(); + + for (int i = 0; i < 16; i++) { + CounterDataDto channelDataDto; + + channelDataDto.devCode = deviceId; + + channelDataDto.frameId = frameId; + channelDataDto.channelData = qrand() % 400; + channelDataDto.channelRefId = 0; + channelDataDto.channelId = i; + channelDataDto.milisecond = ts; + channelDataDto.timestamp = now.toString("yyyyMMddHHmmssZZZ"); + + QJsonObject dtoJson = channelDataDto.toJSON(); + dtoJson.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + dtoJson.insert("deviceId", deviceId); + + std::cout << QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact)).toStdString() << std::endl; + + kafkaUtil->produceMessage(QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact))); + } +} + +void CounterWindow::on_pushButton_2_clicked() +{ + disconnect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); } diff --git a/CounterAcq/CounterWindow.h b/CounterAcq/CounterWindow.h index c0f439d..c95eebc 100644 --- a/CounterAcq/CounterWindow.h +++ b/CounterAcq/CounterWindow.h @@ -3,6 +3,7 @@ #include #include "common/utils/SettingConfig.h" +#include "common/utils/QKafkaUtil.h" #include "CounterDevice.h" namespace Ui { @@ -20,8 +21,14 @@ private slots: void on_pushButton_clicked(); + void mockOneFrame(); + + void on_pushButton_2_clicked(); + private: Ui::CounterWindow *ui; + + QTimer * timer; }; #endif // COUNTERWINDOW_H diff --git a/CounterAcq/CounterWindow.ui b/CounterAcq/CounterWindow.ui index 60a7bac..a0d666c 100644 --- a/CounterAcq/CounterWindow.ui +++ b/CounterAcq/CounterWindow.ui @@ -17,7 +17,7 @@ 100 - 120 + 40 181 51 @@ -26,6 +26,19 @@ Mock ClockDiff Data + + + + 100 + 120 + 181 + 41 + + + + Stop Mock + + diff --git a/CounterAcq/common/utils/QSerialPortUtil.cpp b/CounterAcq/common/utils/QSerialPortUtil.cpp index 2d54019..3870199 100644 --- a/CounterAcq/common/utils/QSerialPortUtil.cpp +++ b/CounterAcq/common/utils/QSerialPortUtil.cpp @@ -1,6 +1,8 @@ #include "QSerialPortUtil.h" #include +#include +#include QSerialPortUtil::QSerialPortUtil(QObject *parent) : QObject(parent) { @@ -18,12 +20,18 @@ open = serial.open(QIODevice::ReadWrite); - if (open == true) - { +// if (open == true) +// { // 绑定信号与槽 connect(&serial, &QSerialPort::readyRead, this, &QSerialPortUtil::readData); - } + + // mock data received per second + QTimer * timer = new QTimer(this); + connect(timer, &QTimer::timeout, + this, &QSerialPortUtil::mockReceivData); + timer->start(1000 * 5); +// } } void QSerialPortUtil::sendData(QByteArray data) @@ -46,3 +54,25 @@ { return this->open; } + +void QSerialPortUtil::mockReceivData() +{ + QDateTime now = QDateTime::currentDateTime(); + for (int i = 1; i <= 8; i++) + { + QByteArray buffer; + + QString channel = QString("%1").arg(i); + QString channelRef = "1"; + QString dataValue = QString("%1").arg(qrand() % 400); + QString level = QString("%1").arg(qrand() % 4 / (double) 10); + QString frameId = QString("%1").arg(now.toSecsSinceEpoch() % 10000); + + buffer.append("$GL,") + .append("0,").append(channel + ",").append("1,").append(channelRef + ",") + .append(dataValue + ",").append("1,").append(level + ",").append(frameId + "*") + .append("00").append("\r\n"); + + emit dataRecieved(buffer); + } +} diff --git a/CounterAcq/common/utils/QSerialPortUtil.h b/CounterAcq/common/utils/QSerialPortUtil.h index 1a7f62f..accf403 100644 --- a/CounterAcq/common/utils/QSerialPortUtil.h +++ b/CounterAcq/common/utils/QSerialPortUtil.h @@ -21,6 +21,8 @@ bool open; + void mockReceivData(); + signals: void dataRecieved(QByteArray data); // 收到数据的信号 }; diff --git a/CounterAcq/common/utils/SettingConfig.cpp b/CounterAcq/common/utils/SettingConfig.cpp index 0c9d89d..67a8dd0 100644 --- a/CounterAcq/common/utils/SettingConfig.cpp +++ b/CounterAcq/common/utils/SettingConfig.cpp @@ -9,6 +9,7 @@ BAUD_RATE = getProperty("com", "baudRate").toUInt(); DEV_CODES = getProperty("com", "devCodes").toString(); + NEED_KAFKA = getProperty("kafka", "needKafka").toUInt(); KAFKA_BROKERS = getProperty("kafka", "brokers").toString(); KAFKA_DATA_TOPIC = getProperty("kafka", "dataTopic").toString(); diff --git a/CounterAcq/common/utils/SettingConfig.h b/CounterAcq/common/utils/SettingConfig.h index 8f752a0..2d10a4d 100644 --- a/CounterAcq/common/utils/SettingConfig.h +++ b/CounterAcq/common/utils/SettingConfig.h @@ -31,6 +31,7 @@ int BAUD_RATE; QString DEV_CODES; + int NEED_KAFKA; QString KAFKA_BROKERS; QString KAFKA_DATA_TOPIC; diff --git a/CounterAcq/conf/config.ini b/CounterAcq/conf/config.ini index 1430594..fdc1c17 100644 --- a/CounterAcq/conf/config.ini +++ b/CounterAcq/conf/config.ini @@ -4,8 +4,9 @@ baudRate=115200 [kafka] +neekKafka=1 brokers="111.198.10.15:12502" -dataTopic="clock-data" +dataTopic="cppTest" [client] clientId="112233445566" diff --git a/CounterAcq/CounterDevice.cpp b/CounterAcq/CounterDevice.cpp index 3dc812b..113f83b 100644 --- a/CounterAcq/CounterDevice.cpp +++ b/CounterAcq/CounterDevice.cpp @@ -7,6 +7,10 @@ { connect(&this->serialUtil, &QSerialPortUtil::dataRecieved, this, &CounterDevice::dataReceivedHandler); + + kafkaUtil.setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil.setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil.createProducer(); } CounterDevice::~CounterDevice() @@ -74,15 +78,6 @@ void CounterDevice::afterFramePhase(CounterDataDto * counterData) { - std::cout << counterData->frameId.toStdString() << ", " << counterData->timestamp.toStdString() << ", " << counterData->milisecond << std::endl; -// for (int i = 1; i <= PHASE_MESSURE_CHANNEL; i++) -// { -// if (phaseData->channelActive.at(i-1).toInt() != 0) -// { -// std::cout << "ch: " << i << ", " << phaseData->channelData.at(i-1) << std::endl; -// } -// } - // 1. 清空dataBuff,等待下一帧的数据 this->dataBuff.clear(); @@ -95,28 +90,29 @@ QString content = counterData->timestamp + " " + counterData->rawFrame.left(counterData->rawFrame.size() - COUNTER_FRAME_TAIL.size()); QLogUtil::writeRawDataLog(filename, content); - // 2.2 各个通道的相差数据 -// for (int i = 1; i <= phaseData->channelActive.size(); i++) -// { -// if (phaseData->channelActive.at(i-1).toUInt() == 1) -// { -// QString chFilename("%1CH_%2_%3.log"); -// chFilename = chFilename.arg(filePrefix); -// if (i < 10) -// { -// chFilename = chFilename.arg(QString("0%1").arg(i)); -// } else -// { -// chFilename = chFilename.arg(i); -// } -// chFilename = chFilename.arg(filePostfix); -// QString channelDataStr = QString("%1 %2 %3").arg(phaseData->timestamp).arg(phaseData->channelData.at(i-1)).arg(phaseData->frameId); + // 2.2 各个通道的clock diff数据 + QString chFilename("%1CH_%2_%3.log"); + chFilename = chFilename.arg(filePrefix); + if (counterData->channelId < 10) + { + chFilename = chFilename.arg(QString("0%1").arg(counterData->channelId)); + } else + { + chFilename = chFilename.arg(counterData->channelId); + } + chFilename = chFilename.arg(filePostfix); + QString channelDataStr = QString("%1 %2 %3").arg(counterData->timestamp).arg(counterData->channelClockValue).arg(counterData->frameId); -// QLogUtil::writeChannelDataLog(chFilename, channelDataStr); -// } -// } + QLogUtil::writeChannelDataLog(chFilename, channelDataStr); // 3. 输出到中间件,执行后续处理过程 + if (SettingConfig::getInstance().NEED_KAFKA == 1) + { + QJsonObject jsonObj = counterData->toJSON(); + jsonObj.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + jsonObj.insert("deviceId", devCode); + kafkaUtil.produceMessage(QString(QJsonDocument(jsonObj).toJson(QJsonDocument::Compact))); + } // 4. 在界面上简单显示相差数据结果 // emit this->sendDataToDraw(counterData); diff --git a/CounterAcq/CounterDevice.h b/CounterAcq/CounterDevice.h index 3ca84da..2e5f04e 100644 --- a/CounterAcq/CounterDevice.h +++ b/CounterAcq/CounterDevice.h @@ -4,8 +4,10 @@ #include #include "common/utils/QSerialPortUtil.h" +#include "common/utils/QKafkaUtil.h" #include "common/utils/QByteUtil.h" #include "common/utils/QLogUtil.h" +#include "common/utils/SettingConfig.h" #include "protocol/CounterProtocolBM.h" class CounterDevice : public QObject @@ -32,6 +34,7 @@ int baudRate; QSerialPortUtil serialUtil; + QKafkaUtil kafkaUtil; QByteArray dataBuff; signals: diff --git a/CounterAcq/CounterWindow.cpp b/CounterAcq/CounterWindow.cpp index 4bad032..6059e0b 100644 --- a/CounterAcq/CounterWindow.cpp +++ b/CounterAcq/CounterWindow.cpp @@ -1,6 +1,10 @@ #include "CounterWindow.h" #include "ui_CounterWindow.h" +#include +#include +#include + CounterWindow::CounterWindow(QWidget *parent) : QWidget(parent), ui(new Ui::CounterWindow) @@ -32,6 +36,9 @@ device->initSerialPort(); // device->startWork(); } + + timer = new QTimer(this); + timer->start(1000 * 10); } CounterWindow::~CounterWindow() @@ -41,5 +48,48 @@ void CounterWindow::on_pushButton_clicked() { + connect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); +} + +void CounterWindow::mockOneFrame() +{ qlonglong deviceId = 9101; + + QKafkaUtil * kafkaUtil = new QKafkaUtil(this); + kafkaUtil->setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil->setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil->createProducer(); + + QDateTime now = QDateTime::currentDateTime(); + + QString frameId = QString("%1").arg(now.toSecsSinceEpoch() % 10000); + qlonglong ts = now.toMSecsSinceEpoch(); + + for (int i = 0; i < 16; i++) { + CounterDataDto channelDataDto; + + channelDataDto.devCode = deviceId; + + channelDataDto.frameId = frameId; + channelDataDto.channelData = qrand() % 400; + channelDataDto.channelRefId = 0; + channelDataDto.channelId = i; + channelDataDto.milisecond = ts; + channelDataDto.timestamp = now.toString("yyyyMMddHHmmssZZZ"); + + QJsonObject dtoJson = channelDataDto.toJSON(); + dtoJson.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + dtoJson.insert("deviceId", deviceId); + + std::cout << QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact)).toStdString() << std::endl; + + kafkaUtil->produceMessage(QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact))); + } +} + +void CounterWindow::on_pushButton_2_clicked() +{ + disconnect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); } diff --git a/CounterAcq/CounterWindow.h b/CounterAcq/CounterWindow.h index c0f439d..c95eebc 100644 --- a/CounterAcq/CounterWindow.h +++ b/CounterAcq/CounterWindow.h @@ -3,6 +3,7 @@ #include #include "common/utils/SettingConfig.h" +#include "common/utils/QKafkaUtil.h" #include "CounterDevice.h" namespace Ui { @@ -20,8 +21,14 @@ private slots: void on_pushButton_clicked(); + void mockOneFrame(); + + void on_pushButton_2_clicked(); + private: Ui::CounterWindow *ui; + + QTimer * timer; }; #endif // COUNTERWINDOW_H diff --git a/CounterAcq/CounterWindow.ui b/CounterAcq/CounterWindow.ui index 60a7bac..a0d666c 100644 --- a/CounterAcq/CounterWindow.ui +++ b/CounterAcq/CounterWindow.ui @@ -17,7 +17,7 @@ 100 - 120 + 40 181 51 @@ -26,6 +26,19 @@ Mock ClockDiff Data + + + + 100 + 120 + 181 + 41 + + + + Stop Mock + + diff --git a/CounterAcq/common/utils/QSerialPortUtil.cpp b/CounterAcq/common/utils/QSerialPortUtil.cpp index 2d54019..3870199 100644 --- a/CounterAcq/common/utils/QSerialPortUtil.cpp +++ b/CounterAcq/common/utils/QSerialPortUtil.cpp @@ -1,6 +1,8 @@ #include "QSerialPortUtil.h" #include +#include +#include QSerialPortUtil::QSerialPortUtil(QObject *parent) : QObject(parent) { @@ -18,12 +20,18 @@ open = serial.open(QIODevice::ReadWrite); - if (open == true) - { +// if (open == true) +// { // 绑定信号与槽 connect(&serial, &QSerialPort::readyRead, this, &QSerialPortUtil::readData); - } + + // mock data received per second + QTimer * timer = new QTimer(this); + connect(timer, &QTimer::timeout, + this, &QSerialPortUtil::mockReceivData); + timer->start(1000 * 5); +// } } void QSerialPortUtil::sendData(QByteArray data) @@ -46,3 +54,25 @@ { return this->open; } + +void QSerialPortUtil::mockReceivData() +{ + QDateTime now = QDateTime::currentDateTime(); + for (int i = 1; i <= 8; i++) + { + QByteArray buffer; + + QString channel = QString("%1").arg(i); + QString channelRef = "1"; + QString dataValue = QString("%1").arg(qrand() % 400); + QString level = QString("%1").arg(qrand() % 4 / (double) 10); + QString frameId = QString("%1").arg(now.toSecsSinceEpoch() % 10000); + + buffer.append("$GL,") + .append("0,").append(channel + ",").append("1,").append(channelRef + ",") + .append(dataValue + ",").append("1,").append(level + ",").append(frameId + "*") + .append("00").append("\r\n"); + + emit dataRecieved(buffer); + } +} diff --git a/CounterAcq/common/utils/QSerialPortUtil.h b/CounterAcq/common/utils/QSerialPortUtil.h index 1a7f62f..accf403 100644 --- a/CounterAcq/common/utils/QSerialPortUtil.h +++ b/CounterAcq/common/utils/QSerialPortUtil.h @@ -21,6 +21,8 @@ bool open; + void mockReceivData(); + signals: void dataRecieved(QByteArray data); // 收到数据的信号 }; diff --git a/CounterAcq/common/utils/SettingConfig.cpp b/CounterAcq/common/utils/SettingConfig.cpp index 0c9d89d..67a8dd0 100644 --- a/CounterAcq/common/utils/SettingConfig.cpp +++ b/CounterAcq/common/utils/SettingConfig.cpp @@ -9,6 +9,7 @@ BAUD_RATE = getProperty("com", "baudRate").toUInt(); DEV_CODES = getProperty("com", "devCodes").toString(); + NEED_KAFKA = getProperty("kafka", "needKafka").toUInt(); KAFKA_BROKERS = getProperty("kafka", "brokers").toString(); KAFKA_DATA_TOPIC = getProperty("kafka", "dataTopic").toString(); diff --git a/CounterAcq/common/utils/SettingConfig.h b/CounterAcq/common/utils/SettingConfig.h index 8f752a0..2d10a4d 100644 --- a/CounterAcq/common/utils/SettingConfig.h +++ b/CounterAcq/common/utils/SettingConfig.h @@ -31,6 +31,7 @@ int BAUD_RATE; QString DEV_CODES; + int NEED_KAFKA; QString KAFKA_BROKERS; QString KAFKA_DATA_TOPIC; diff --git a/CounterAcq/conf/config.ini b/CounterAcq/conf/config.ini index 1430594..fdc1c17 100644 --- a/CounterAcq/conf/config.ini +++ b/CounterAcq/conf/config.ini @@ -4,8 +4,9 @@ baudRate=115200 [kafka] +neekKafka=1 brokers="111.198.10.15:12502" -dataTopic="clock-data" +dataTopic="cppTest" [client] clientId="112233445566" diff --git a/CounterAcq/protocol/CounterProtocolBM.cpp b/CounterAcq/protocol/CounterProtocolBM.cpp index 63a047b..b33be0d 100644 --- a/CounterAcq/protocol/CounterProtocolBM.cpp +++ b/CounterAcq/protocol/CounterProtocolBM.cpp @@ -20,17 +20,17 @@ QString contentStr = QString(content); QStringList subList = contentStr.split(","); - std::cout << contentStr.toStdString() << std::endl; - counterData->type = subList.at(0).toUInt(); counterData->channelId = subList.at(1).toUInt(); counterData->channelActive = subList.at(2).toUInt(); counterData->channelRefId = subList.at(3).toUInt(); - counterData->channelData = subList.at(4).toULongLong(); + counterData->channelData = subList.at(4).toLongLong(); counterData->load = subList.at(5).toDouble(); counterData->level = subList.at(6).toDouble(); counterData->frameId = subList.at(7); + counterData->channelClockValue = counterData->channelData * 1E-11; + return true; } diff --git a/CounterAcq/CounterDevice.cpp b/CounterAcq/CounterDevice.cpp index 3dc812b..113f83b 100644 --- a/CounterAcq/CounterDevice.cpp +++ b/CounterAcq/CounterDevice.cpp @@ -7,6 +7,10 @@ { connect(&this->serialUtil, &QSerialPortUtil::dataRecieved, this, &CounterDevice::dataReceivedHandler); + + kafkaUtil.setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil.setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil.createProducer(); } CounterDevice::~CounterDevice() @@ -74,15 +78,6 @@ void CounterDevice::afterFramePhase(CounterDataDto * counterData) { - std::cout << counterData->frameId.toStdString() << ", " << counterData->timestamp.toStdString() << ", " << counterData->milisecond << std::endl; -// for (int i = 1; i <= PHASE_MESSURE_CHANNEL; i++) -// { -// if (phaseData->channelActive.at(i-1).toInt() != 0) -// { -// std::cout << "ch: " << i << ", " << phaseData->channelData.at(i-1) << std::endl; -// } -// } - // 1. 清空dataBuff,等待下一帧的数据 this->dataBuff.clear(); @@ -95,28 +90,29 @@ QString content = counterData->timestamp + " " + counterData->rawFrame.left(counterData->rawFrame.size() - COUNTER_FRAME_TAIL.size()); QLogUtil::writeRawDataLog(filename, content); - // 2.2 各个通道的相差数据 -// for (int i = 1; i <= phaseData->channelActive.size(); i++) -// { -// if (phaseData->channelActive.at(i-1).toUInt() == 1) -// { -// QString chFilename("%1CH_%2_%3.log"); -// chFilename = chFilename.arg(filePrefix); -// if (i < 10) -// { -// chFilename = chFilename.arg(QString("0%1").arg(i)); -// } else -// { -// chFilename = chFilename.arg(i); -// } -// chFilename = chFilename.arg(filePostfix); -// QString channelDataStr = QString("%1 %2 %3").arg(phaseData->timestamp).arg(phaseData->channelData.at(i-1)).arg(phaseData->frameId); + // 2.2 各个通道的clock diff数据 + QString chFilename("%1CH_%2_%3.log"); + chFilename = chFilename.arg(filePrefix); + if (counterData->channelId < 10) + { + chFilename = chFilename.arg(QString("0%1").arg(counterData->channelId)); + } else + { + chFilename = chFilename.arg(counterData->channelId); + } + chFilename = chFilename.arg(filePostfix); + QString channelDataStr = QString("%1 %2 %3").arg(counterData->timestamp).arg(counterData->channelClockValue).arg(counterData->frameId); -// QLogUtil::writeChannelDataLog(chFilename, channelDataStr); -// } -// } + QLogUtil::writeChannelDataLog(chFilename, channelDataStr); // 3. 输出到中间件,执行后续处理过程 + if (SettingConfig::getInstance().NEED_KAFKA == 1) + { + QJsonObject jsonObj = counterData->toJSON(); + jsonObj.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + jsonObj.insert("deviceId", devCode); + kafkaUtil.produceMessage(QString(QJsonDocument(jsonObj).toJson(QJsonDocument::Compact))); + } // 4. 在界面上简单显示相差数据结果 // emit this->sendDataToDraw(counterData); diff --git a/CounterAcq/CounterDevice.h b/CounterAcq/CounterDevice.h index 3ca84da..2e5f04e 100644 --- a/CounterAcq/CounterDevice.h +++ b/CounterAcq/CounterDevice.h @@ -4,8 +4,10 @@ #include #include "common/utils/QSerialPortUtil.h" +#include "common/utils/QKafkaUtil.h" #include "common/utils/QByteUtil.h" #include "common/utils/QLogUtil.h" +#include "common/utils/SettingConfig.h" #include "protocol/CounterProtocolBM.h" class CounterDevice : public QObject @@ -32,6 +34,7 @@ int baudRate; QSerialPortUtil serialUtil; + QKafkaUtil kafkaUtil; QByteArray dataBuff; signals: diff --git a/CounterAcq/CounterWindow.cpp b/CounterAcq/CounterWindow.cpp index 4bad032..6059e0b 100644 --- a/CounterAcq/CounterWindow.cpp +++ b/CounterAcq/CounterWindow.cpp @@ -1,6 +1,10 @@ #include "CounterWindow.h" #include "ui_CounterWindow.h" +#include +#include +#include + CounterWindow::CounterWindow(QWidget *parent) : QWidget(parent), ui(new Ui::CounterWindow) @@ -32,6 +36,9 @@ device->initSerialPort(); // device->startWork(); } + + timer = new QTimer(this); + timer->start(1000 * 10); } CounterWindow::~CounterWindow() @@ -41,5 +48,48 @@ void CounterWindow::on_pushButton_clicked() { + connect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); +} + +void CounterWindow::mockOneFrame() +{ qlonglong deviceId = 9101; + + QKafkaUtil * kafkaUtil = new QKafkaUtil(this); + kafkaUtil->setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil->setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil->createProducer(); + + QDateTime now = QDateTime::currentDateTime(); + + QString frameId = QString("%1").arg(now.toSecsSinceEpoch() % 10000); + qlonglong ts = now.toMSecsSinceEpoch(); + + for (int i = 0; i < 16; i++) { + CounterDataDto channelDataDto; + + channelDataDto.devCode = deviceId; + + channelDataDto.frameId = frameId; + channelDataDto.channelData = qrand() % 400; + channelDataDto.channelRefId = 0; + channelDataDto.channelId = i; + channelDataDto.milisecond = ts; + channelDataDto.timestamp = now.toString("yyyyMMddHHmmssZZZ"); + + QJsonObject dtoJson = channelDataDto.toJSON(); + dtoJson.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + dtoJson.insert("deviceId", deviceId); + + std::cout << QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact)).toStdString() << std::endl; + + kafkaUtil->produceMessage(QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact))); + } +} + +void CounterWindow::on_pushButton_2_clicked() +{ + disconnect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); } diff --git a/CounterAcq/CounterWindow.h b/CounterAcq/CounterWindow.h index c0f439d..c95eebc 100644 --- a/CounterAcq/CounterWindow.h +++ b/CounterAcq/CounterWindow.h @@ -3,6 +3,7 @@ #include #include "common/utils/SettingConfig.h" +#include "common/utils/QKafkaUtil.h" #include "CounterDevice.h" namespace Ui { @@ -20,8 +21,14 @@ private slots: void on_pushButton_clicked(); + void mockOneFrame(); + + void on_pushButton_2_clicked(); + private: Ui::CounterWindow *ui; + + QTimer * timer; }; #endif // COUNTERWINDOW_H diff --git a/CounterAcq/CounterWindow.ui b/CounterAcq/CounterWindow.ui index 60a7bac..a0d666c 100644 --- a/CounterAcq/CounterWindow.ui +++ b/CounterAcq/CounterWindow.ui @@ -17,7 +17,7 @@ 100 - 120 + 40 181 51 @@ -26,6 +26,19 @@ Mock ClockDiff Data + + + + 100 + 120 + 181 + 41 + + + + Stop Mock + + diff --git a/CounterAcq/common/utils/QSerialPortUtil.cpp b/CounterAcq/common/utils/QSerialPortUtil.cpp index 2d54019..3870199 100644 --- a/CounterAcq/common/utils/QSerialPortUtil.cpp +++ b/CounterAcq/common/utils/QSerialPortUtil.cpp @@ -1,6 +1,8 @@ #include "QSerialPortUtil.h" #include +#include +#include QSerialPortUtil::QSerialPortUtil(QObject *parent) : QObject(parent) { @@ -18,12 +20,18 @@ open = serial.open(QIODevice::ReadWrite); - if (open == true) - { +// if (open == true) +// { // 绑定信号与槽 connect(&serial, &QSerialPort::readyRead, this, &QSerialPortUtil::readData); - } + + // mock data received per second + QTimer * timer = new QTimer(this); + connect(timer, &QTimer::timeout, + this, &QSerialPortUtil::mockReceivData); + timer->start(1000 * 5); +// } } void QSerialPortUtil::sendData(QByteArray data) @@ -46,3 +54,25 @@ { return this->open; } + +void QSerialPortUtil::mockReceivData() +{ + QDateTime now = QDateTime::currentDateTime(); + for (int i = 1; i <= 8; i++) + { + QByteArray buffer; + + QString channel = QString("%1").arg(i); + QString channelRef = "1"; + QString dataValue = QString("%1").arg(qrand() % 400); + QString level = QString("%1").arg(qrand() % 4 / (double) 10); + QString frameId = QString("%1").arg(now.toSecsSinceEpoch() % 10000); + + buffer.append("$GL,") + .append("0,").append(channel + ",").append("1,").append(channelRef + ",") + .append(dataValue + ",").append("1,").append(level + ",").append(frameId + "*") + .append("00").append("\r\n"); + + emit dataRecieved(buffer); + } +} diff --git a/CounterAcq/common/utils/QSerialPortUtil.h b/CounterAcq/common/utils/QSerialPortUtil.h index 1a7f62f..accf403 100644 --- a/CounterAcq/common/utils/QSerialPortUtil.h +++ b/CounterAcq/common/utils/QSerialPortUtil.h @@ -21,6 +21,8 @@ bool open; + void mockReceivData(); + signals: void dataRecieved(QByteArray data); // 收到数据的信号 }; diff --git a/CounterAcq/common/utils/SettingConfig.cpp b/CounterAcq/common/utils/SettingConfig.cpp index 0c9d89d..67a8dd0 100644 --- a/CounterAcq/common/utils/SettingConfig.cpp +++ b/CounterAcq/common/utils/SettingConfig.cpp @@ -9,6 +9,7 @@ BAUD_RATE = getProperty("com", "baudRate").toUInt(); DEV_CODES = getProperty("com", "devCodes").toString(); + NEED_KAFKA = getProperty("kafka", "needKafka").toUInt(); KAFKA_BROKERS = getProperty("kafka", "brokers").toString(); KAFKA_DATA_TOPIC = getProperty("kafka", "dataTopic").toString(); diff --git a/CounterAcq/common/utils/SettingConfig.h b/CounterAcq/common/utils/SettingConfig.h index 8f752a0..2d10a4d 100644 --- a/CounterAcq/common/utils/SettingConfig.h +++ b/CounterAcq/common/utils/SettingConfig.h @@ -31,6 +31,7 @@ int BAUD_RATE; QString DEV_CODES; + int NEED_KAFKA; QString KAFKA_BROKERS; QString KAFKA_DATA_TOPIC; diff --git a/CounterAcq/conf/config.ini b/CounterAcq/conf/config.ini index 1430594..fdc1c17 100644 --- a/CounterAcq/conf/config.ini +++ b/CounterAcq/conf/config.ini @@ -4,8 +4,9 @@ baudRate=115200 [kafka] +neekKafka=1 brokers="111.198.10.15:12502" -dataTopic="clock-data" +dataTopic="cppTest" [client] clientId="112233445566" diff --git a/CounterAcq/protocol/CounterProtocolBM.cpp b/CounterAcq/protocol/CounterProtocolBM.cpp index 63a047b..b33be0d 100644 --- a/CounterAcq/protocol/CounterProtocolBM.cpp +++ b/CounterAcq/protocol/CounterProtocolBM.cpp @@ -20,17 +20,17 @@ QString contentStr = QString(content); QStringList subList = contentStr.split(","); - std::cout << contentStr.toStdString() << std::endl; - counterData->type = subList.at(0).toUInt(); counterData->channelId = subList.at(1).toUInt(); counterData->channelActive = subList.at(2).toUInt(); counterData->channelRefId = subList.at(3).toUInt(); - counterData->channelData = subList.at(4).toULongLong(); + counterData->channelData = subList.at(4).toLongLong(); counterData->load = subList.at(5).toDouble(); counterData->level = subList.at(6).toDouble(); counterData->frameId = subList.at(7); + counterData->channelClockValue = counterData->channelData * 1E-11; + return true; } diff --git a/CounterAcq/protocol/dto/CounterDataDto.cpp b/CounterAcq/protocol/dto/CounterDataDto.cpp index c86988a..99b8b09 100644 --- a/CounterAcq/protocol/dto/CounterDataDto.cpp +++ b/CounterAcq/protocol/dto/CounterDataDto.cpp @@ -12,11 +12,11 @@ QJsonObject dataObj; dataObj.insert("channelRefNo", this->channelRefId); - dataObj.insert("dataValue", this->channelData); + dataObj.insert("dataValue", QString("%1").arg(this->channelClockValue)); dataObj.insert("frameId", this->frameId); jsonObj.insert("channelNo", this->channelId); - jsonObj.insert("js", this->milisecond); + jsonObj.insert("ts", this->milisecond); jsonObj.insert("data", dataObj); return jsonObj; diff --git a/CounterAcq/CounterDevice.cpp b/CounterAcq/CounterDevice.cpp index 3dc812b..113f83b 100644 --- a/CounterAcq/CounterDevice.cpp +++ b/CounterAcq/CounterDevice.cpp @@ -7,6 +7,10 @@ { connect(&this->serialUtil, &QSerialPortUtil::dataRecieved, this, &CounterDevice::dataReceivedHandler); + + kafkaUtil.setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil.setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil.createProducer(); } CounterDevice::~CounterDevice() @@ -74,15 +78,6 @@ void CounterDevice::afterFramePhase(CounterDataDto * counterData) { - std::cout << counterData->frameId.toStdString() << ", " << counterData->timestamp.toStdString() << ", " << counterData->milisecond << std::endl; -// for (int i = 1; i <= PHASE_MESSURE_CHANNEL; i++) -// { -// if (phaseData->channelActive.at(i-1).toInt() != 0) -// { -// std::cout << "ch: " << i << ", " << phaseData->channelData.at(i-1) << std::endl; -// } -// } - // 1. 清空dataBuff,等待下一帧的数据 this->dataBuff.clear(); @@ -95,28 +90,29 @@ QString content = counterData->timestamp + " " + counterData->rawFrame.left(counterData->rawFrame.size() - COUNTER_FRAME_TAIL.size()); QLogUtil::writeRawDataLog(filename, content); - // 2.2 各个通道的相差数据 -// for (int i = 1; i <= phaseData->channelActive.size(); i++) -// { -// if (phaseData->channelActive.at(i-1).toUInt() == 1) -// { -// QString chFilename("%1CH_%2_%3.log"); -// chFilename = chFilename.arg(filePrefix); -// if (i < 10) -// { -// chFilename = chFilename.arg(QString("0%1").arg(i)); -// } else -// { -// chFilename = chFilename.arg(i); -// } -// chFilename = chFilename.arg(filePostfix); -// QString channelDataStr = QString("%1 %2 %3").arg(phaseData->timestamp).arg(phaseData->channelData.at(i-1)).arg(phaseData->frameId); + // 2.2 各个通道的clock diff数据 + QString chFilename("%1CH_%2_%3.log"); + chFilename = chFilename.arg(filePrefix); + if (counterData->channelId < 10) + { + chFilename = chFilename.arg(QString("0%1").arg(counterData->channelId)); + } else + { + chFilename = chFilename.arg(counterData->channelId); + } + chFilename = chFilename.arg(filePostfix); + QString channelDataStr = QString("%1 %2 %3").arg(counterData->timestamp).arg(counterData->channelClockValue).arg(counterData->frameId); -// QLogUtil::writeChannelDataLog(chFilename, channelDataStr); -// } -// } + QLogUtil::writeChannelDataLog(chFilename, channelDataStr); // 3. 输出到中间件,执行后续处理过程 + if (SettingConfig::getInstance().NEED_KAFKA == 1) + { + QJsonObject jsonObj = counterData->toJSON(); + jsonObj.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + jsonObj.insert("deviceId", devCode); + kafkaUtil.produceMessage(QString(QJsonDocument(jsonObj).toJson(QJsonDocument::Compact))); + } // 4. 在界面上简单显示相差数据结果 // emit this->sendDataToDraw(counterData); diff --git a/CounterAcq/CounterDevice.h b/CounterAcq/CounterDevice.h index 3ca84da..2e5f04e 100644 --- a/CounterAcq/CounterDevice.h +++ b/CounterAcq/CounterDevice.h @@ -4,8 +4,10 @@ #include #include "common/utils/QSerialPortUtil.h" +#include "common/utils/QKafkaUtil.h" #include "common/utils/QByteUtil.h" #include "common/utils/QLogUtil.h" +#include "common/utils/SettingConfig.h" #include "protocol/CounterProtocolBM.h" class CounterDevice : public QObject @@ -32,6 +34,7 @@ int baudRate; QSerialPortUtil serialUtil; + QKafkaUtil kafkaUtil; QByteArray dataBuff; signals: diff --git a/CounterAcq/CounterWindow.cpp b/CounterAcq/CounterWindow.cpp index 4bad032..6059e0b 100644 --- a/CounterAcq/CounterWindow.cpp +++ b/CounterAcq/CounterWindow.cpp @@ -1,6 +1,10 @@ #include "CounterWindow.h" #include "ui_CounterWindow.h" +#include +#include +#include + CounterWindow::CounterWindow(QWidget *parent) : QWidget(parent), ui(new Ui::CounterWindow) @@ -32,6 +36,9 @@ device->initSerialPort(); // device->startWork(); } + + timer = new QTimer(this); + timer->start(1000 * 10); } CounterWindow::~CounterWindow() @@ -41,5 +48,48 @@ void CounterWindow::on_pushButton_clicked() { + connect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); +} + +void CounterWindow::mockOneFrame() +{ qlonglong deviceId = 9101; + + QKafkaUtil * kafkaUtil = new QKafkaUtil(this); + kafkaUtil->setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil->setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil->createProducer(); + + QDateTime now = QDateTime::currentDateTime(); + + QString frameId = QString("%1").arg(now.toSecsSinceEpoch() % 10000); + qlonglong ts = now.toMSecsSinceEpoch(); + + for (int i = 0; i < 16; i++) { + CounterDataDto channelDataDto; + + channelDataDto.devCode = deviceId; + + channelDataDto.frameId = frameId; + channelDataDto.channelData = qrand() % 400; + channelDataDto.channelRefId = 0; + channelDataDto.channelId = i; + channelDataDto.milisecond = ts; + channelDataDto.timestamp = now.toString("yyyyMMddHHmmssZZZ"); + + QJsonObject dtoJson = channelDataDto.toJSON(); + dtoJson.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + dtoJson.insert("deviceId", deviceId); + + std::cout << QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact)).toStdString() << std::endl; + + kafkaUtil->produceMessage(QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact))); + } +} + +void CounterWindow::on_pushButton_2_clicked() +{ + disconnect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); } diff --git a/CounterAcq/CounterWindow.h b/CounterAcq/CounterWindow.h index c0f439d..c95eebc 100644 --- a/CounterAcq/CounterWindow.h +++ b/CounterAcq/CounterWindow.h @@ -3,6 +3,7 @@ #include #include "common/utils/SettingConfig.h" +#include "common/utils/QKafkaUtil.h" #include "CounterDevice.h" namespace Ui { @@ -20,8 +21,14 @@ private slots: void on_pushButton_clicked(); + void mockOneFrame(); + + void on_pushButton_2_clicked(); + private: Ui::CounterWindow *ui; + + QTimer * timer; }; #endif // COUNTERWINDOW_H diff --git a/CounterAcq/CounterWindow.ui b/CounterAcq/CounterWindow.ui index 60a7bac..a0d666c 100644 --- a/CounterAcq/CounterWindow.ui +++ b/CounterAcq/CounterWindow.ui @@ -17,7 +17,7 @@ 100 - 120 + 40 181 51 @@ -26,6 +26,19 @@ Mock ClockDiff Data + + + + 100 + 120 + 181 + 41 + + + + Stop Mock + + diff --git a/CounterAcq/common/utils/QSerialPortUtil.cpp b/CounterAcq/common/utils/QSerialPortUtil.cpp index 2d54019..3870199 100644 --- a/CounterAcq/common/utils/QSerialPortUtil.cpp +++ b/CounterAcq/common/utils/QSerialPortUtil.cpp @@ -1,6 +1,8 @@ #include "QSerialPortUtil.h" #include +#include +#include QSerialPortUtil::QSerialPortUtil(QObject *parent) : QObject(parent) { @@ -18,12 +20,18 @@ open = serial.open(QIODevice::ReadWrite); - if (open == true) - { +// if (open == true) +// { // 绑定信号与槽 connect(&serial, &QSerialPort::readyRead, this, &QSerialPortUtil::readData); - } + + // mock data received per second + QTimer * timer = new QTimer(this); + connect(timer, &QTimer::timeout, + this, &QSerialPortUtil::mockReceivData); + timer->start(1000 * 5); +// } } void QSerialPortUtil::sendData(QByteArray data) @@ -46,3 +54,25 @@ { return this->open; } + +void QSerialPortUtil::mockReceivData() +{ + QDateTime now = QDateTime::currentDateTime(); + for (int i = 1; i <= 8; i++) + { + QByteArray buffer; + + QString channel = QString("%1").arg(i); + QString channelRef = "1"; + QString dataValue = QString("%1").arg(qrand() % 400); + QString level = QString("%1").arg(qrand() % 4 / (double) 10); + QString frameId = QString("%1").arg(now.toSecsSinceEpoch() % 10000); + + buffer.append("$GL,") + .append("0,").append(channel + ",").append("1,").append(channelRef + ",") + .append(dataValue + ",").append("1,").append(level + ",").append(frameId + "*") + .append("00").append("\r\n"); + + emit dataRecieved(buffer); + } +} diff --git a/CounterAcq/common/utils/QSerialPortUtil.h b/CounterAcq/common/utils/QSerialPortUtil.h index 1a7f62f..accf403 100644 --- a/CounterAcq/common/utils/QSerialPortUtil.h +++ b/CounterAcq/common/utils/QSerialPortUtil.h @@ -21,6 +21,8 @@ bool open; + void mockReceivData(); + signals: void dataRecieved(QByteArray data); // 收到数据的信号 }; diff --git a/CounterAcq/common/utils/SettingConfig.cpp b/CounterAcq/common/utils/SettingConfig.cpp index 0c9d89d..67a8dd0 100644 --- a/CounterAcq/common/utils/SettingConfig.cpp +++ b/CounterAcq/common/utils/SettingConfig.cpp @@ -9,6 +9,7 @@ BAUD_RATE = getProperty("com", "baudRate").toUInt(); DEV_CODES = getProperty("com", "devCodes").toString(); + NEED_KAFKA = getProperty("kafka", "needKafka").toUInt(); KAFKA_BROKERS = getProperty("kafka", "brokers").toString(); KAFKA_DATA_TOPIC = getProperty("kafka", "dataTopic").toString(); diff --git a/CounterAcq/common/utils/SettingConfig.h b/CounterAcq/common/utils/SettingConfig.h index 8f752a0..2d10a4d 100644 --- a/CounterAcq/common/utils/SettingConfig.h +++ b/CounterAcq/common/utils/SettingConfig.h @@ -31,6 +31,7 @@ int BAUD_RATE; QString DEV_CODES; + int NEED_KAFKA; QString KAFKA_BROKERS; QString KAFKA_DATA_TOPIC; diff --git a/CounterAcq/conf/config.ini b/CounterAcq/conf/config.ini index 1430594..fdc1c17 100644 --- a/CounterAcq/conf/config.ini +++ b/CounterAcq/conf/config.ini @@ -4,8 +4,9 @@ baudRate=115200 [kafka] +neekKafka=1 brokers="111.198.10.15:12502" -dataTopic="clock-data" +dataTopic="cppTest" [client] clientId="112233445566" diff --git a/CounterAcq/protocol/CounterProtocolBM.cpp b/CounterAcq/protocol/CounterProtocolBM.cpp index 63a047b..b33be0d 100644 --- a/CounterAcq/protocol/CounterProtocolBM.cpp +++ b/CounterAcq/protocol/CounterProtocolBM.cpp @@ -20,17 +20,17 @@ QString contentStr = QString(content); QStringList subList = contentStr.split(","); - std::cout << contentStr.toStdString() << std::endl; - counterData->type = subList.at(0).toUInt(); counterData->channelId = subList.at(1).toUInt(); counterData->channelActive = subList.at(2).toUInt(); counterData->channelRefId = subList.at(3).toUInt(); - counterData->channelData = subList.at(4).toULongLong(); + counterData->channelData = subList.at(4).toLongLong(); counterData->load = subList.at(5).toDouble(); counterData->level = subList.at(6).toDouble(); counterData->frameId = subList.at(7); + counterData->channelClockValue = counterData->channelData * 1E-11; + return true; } diff --git a/CounterAcq/protocol/dto/CounterDataDto.cpp b/CounterAcq/protocol/dto/CounterDataDto.cpp index c86988a..99b8b09 100644 --- a/CounterAcq/protocol/dto/CounterDataDto.cpp +++ b/CounterAcq/protocol/dto/CounterDataDto.cpp @@ -12,11 +12,11 @@ QJsonObject dataObj; dataObj.insert("channelRefNo", this->channelRefId); - dataObj.insert("dataValue", this->channelData); + dataObj.insert("dataValue", QString("%1").arg(this->channelClockValue)); dataObj.insert("frameId", this->frameId); jsonObj.insert("channelNo", this->channelId); - jsonObj.insert("js", this->milisecond); + jsonObj.insert("ts", this->milisecond); jsonObj.insert("data", dataObj); return jsonObj; diff --git a/CounterAcq/protocol/dto/CounterDataDto.h b/CounterAcq/protocol/dto/CounterDataDto.h index 3008f48..6f4488f 100644 --- a/CounterAcq/protocol/dto/CounterDataDto.h +++ b/CounterAcq/protocol/dto/CounterDataDto.h @@ -20,6 +20,8 @@ qint8 channelId; // 测量通道号 -- <2> qint8 type = 0; // 测量状态 -- <1> + double channelClockValue; // + QByteArray rawFrame; // 原始帧字节数组 QString timestamp; // 时间戳字符串 diff --git a/CounterAcq/CounterDevice.cpp b/CounterAcq/CounterDevice.cpp index 3dc812b..113f83b 100644 --- a/CounterAcq/CounterDevice.cpp +++ b/CounterAcq/CounterDevice.cpp @@ -7,6 +7,10 @@ { connect(&this->serialUtil, &QSerialPortUtil::dataRecieved, this, &CounterDevice::dataReceivedHandler); + + kafkaUtil.setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil.setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil.createProducer(); } CounterDevice::~CounterDevice() @@ -74,15 +78,6 @@ void CounterDevice::afterFramePhase(CounterDataDto * counterData) { - std::cout << counterData->frameId.toStdString() << ", " << counterData->timestamp.toStdString() << ", " << counterData->milisecond << std::endl; -// for (int i = 1; i <= PHASE_MESSURE_CHANNEL; i++) -// { -// if (phaseData->channelActive.at(i-1).toInt() != 0) -// { -// std::cout << "ch: " << i << ", " << phaseData->channelData.at(i-1) << std::endl; -// } -// } - // 1. 清空dataBuff,等待下一帧的数据 this->dataBuff.clear(); @@ -95,28 +90,29 @@ QString content = counterData->timestamp + " " + counterData->rawFrame.left(counterData->rawFrame.size() - COUNTER_FRAME_TAIL.size()); QLogUtil::writeRawDataLog(filename, content); - // 2.2 各个通道的相差数据 -// for (int i = 1; i <= phaseData->channelActive.size(); i++) -// { -// if (phaseData->channelActive.at(i-1).toUInt() == 1) -// { -// QString chFilename("%1CH_%2_%3.log"); -// chFilename = chFilename.arg(filePrefix); -// if (i < 10) -// { -// chFilename = chFilename.arg(QString("0%1").arg(i)); -// } else -// { -// chFilename = chFilename.arg(i); -// } -// chFilename = chFilename.arg(filePostfix); -// QString channelDataStr = QString("%1 %2 %3").arg(phaseData->timestamp).arg(phaseData->channelData.at(i-1)).arg(phaseData->frameId); + // 2.2 各个通道的clock diff数据 + QString chFilename("%1CH_%2_%3.log"); + chFilename = chFilename.arg(filePrefix); + if (counterData->channelId < 10) + { + chFilename = chFilename.arg(QString("0%1").arg(counterData->channelId)); + } else + { + chFilename = chFilename.arg(counterData->channelId); + } + chFilename = chFilename.arg(filePostfix); + QString channelDataStr = QString("%1 %2 %3").arg(counterData->timestamp).arg(counterData->channelClockValue).arg(counterData->frameId); -// QLogUtil::writeChannelDataLog(chFilename, channelDataStr); -// } -// } + QLogUtil::writeChannelDataLog(chFilename, channelDataStr); // 3. 输出到中间件,执行后续处理过程 + if (SettingConfig::getInstance().NEED_KAFKA == 1) + { + QJsonObject jsonObj = counterData->toJSON(); + jsonObj.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + jsonObj.insert("deviceId", devCode); + kafkaUtil.produceMessage(QString(QJsonDocument(jsonObj).toJson(QJsonDocument::Compact))); + } // 4. 在界面上简单显示相差数据结果 // emit this->sendDataToDraw(counterData); diff --git a/CounterAcq/CounterDevice.h b/CounterAcq/CounterDevice.h index 3ca84da..2e5f04e 100644 --- a/CounterAcq/CounterDevice.h +++ b/CounterAcq/CounterDevice.h @@ -4,8 +4,10 @@ #include #include "common/utils/QSerialPortUtil.h" +#include "common/utils/QKafkaUtil.h" #include "common/utils/QByteUtil.h" #include "common/utils/QLogUtil.h" +#include "common/utils/SettingConfig.h" #include "protocol/CounterProtocolBM.h" class CounterDevice : public QObject @@ -32,6 +34,7 @@ int baudRate; QSerialPortUtil serialUtil; + QKafkaUtil kafkaUtil; QByteArray dataBuff; signals: diff --git a/CounterAcq/CounterWindow.cpp b/CounterAcq/CounterWindow.cpp index 4bad032..6059e0b 100644 --- a/CounterAcq/CounterWindow.cpp +++ b/CounterAcq/CounterWindow.cpp @@ -1,6 +1,10 @@ #include "CounterWindow.h" #include "ui_CounterWindow.h" +#include +#include +#include + CounterWindow::CounterWindow(QWidget *parent) : QWidget(parent), ui(new Ui::CounterWindow) @@ -32,6 +36,9 @@ device->initSerialPort(); // device->startWork(); } + + timer = new QTimer(this); + timer->start(1000 * 10); } CounterWindow::~CounterWindow() @@ -41,5 +48,48 @@ void CounterWindow::on_pushButton_clicked() { + connect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); +} + +void CounterWindow::mockOneFrame() +{ qlonglong deviceId = 9101; + + QKafkaUtil * kafkaUtil = new QKafkaUtil(this); + kafkaUtil->setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + kafkaUtil->setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + kafkaUtil->createProducer(); + + QDateTime now = QDateTime::currentDateTime(); + + QString frameId = QString("%1").arg(now.toSecsSinceEpoch() % 10000); + qlonglong ts = now.toMSecsSinceEpoch(); + + for (int i = 0; i < 16; i++) { + CounterDataDto channelDataDto; + + channelDataDto.devCode = deviceId; + + channelDataDto.frameId = frameId; + channelDataDto.channelData = qrand() % 400; + channelDataDto.channelRefId = 0; + channelDataDto.channelId = i; + channelDataDto.milisecond = ts; + channelDataDto.timestamp = now.toString("yyyyMMddHHmmssZZZ"); + + QJsonObject dtoJson = channelDataDto.toJSON(); + dtoJson.insert("clientId", SettingConfig::getInstance().CLIENT_ID); + dtoJson.insert("deviceId", deviceId); + + std::cout << QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact)).toStdString() << std::endl; + + kafkaUtil->produceMessage(QString(QJsonDocument(dtoJson).toJson(QJsonDocument::Compact))); + } +} + +void CounterWindow::on_pushButton_2_clicked() +{ + disconnect(timer, &QTimer::timeout, + this, &CounterWindow::mockOneFrame); } diff --git a/CounterAcq/CounterWindow.h b/CounterAcq/CounterWindow.h index c0f439d..c95eebc 100644 --- a/CounterAcq/CounterWindow.h +++ b/CounterAcq/CounterWindow.h @@ -3,6 +3,7 @@ #include #include "common/utils/SettingConfig.h" +#include "common/utils/QKafkaUtil.h" #include "CounterDevice.h" namespace Ui { @@ -20,8 +21,14 @@ private slots: void on_pushButton_clicked(); + void mockOneFrame(); + + void on_pushButton_2_clicked(); + private: Ui::CounterWindow *ui; + + QTimer * timer; }; #endif // COUNTERWINDOW_H diff --git a/CounterAcq/CounterWindow.ui b/CounterAcq/CounterWindow.ui index 60a7bac..a0d666c 100644 --- a/CounterAcq/CounterWindow.ui +++ b/CounterAcq/CounterWindow.ui @@ -17,7 +17,7 @@ 100 - 120 + 40 181 51 @@ -26,6 +26,19 @@ Mock ClockDiff Data + + + + 100 + 120 + 181 + 41 + + + + Stop Mock + + diff --git a/CounterAcq/common/utils/QSerialPortUtil.cpp b/CounterAcq/common/utils/QSerialPortUtil.cpp index 2d54019..3870199 100644 --- a/CounterAcq/common/utils/QSerialPortUtil.cpp +++ b/CounterAcq/common/utils/QSerialPortUtil.cpp @@ -1,6 +1,8 @@ #include "QSerialPortUtil.h" #include +#include +#include QSerialPortUtil::QSerialPortUtil(QObject *parent) : QObject(parent) { @@ -18,12 +20,18 @@ open = serial.open(QIODevice::ReadWrite); - if (open == true) - { +// if (open == true) +// { // 绑定信号与槽 connect(&serial, &QSerialPort::readyRead, this, &QSerialPortUtil::readData); - } + + // mock data received per second + QTimer * timer = new QTimer(this); + connect(timer, &QTimer::timeout, + this, &QSerialPortUtil::mockReceivData); + timer->start(1000 * 5); +// } } void QSerialPortUtil::sendData(QByteArray data) @@ -46,3 +54,25 @@ { return this->open; } + +void QSerialPortUtil::mockReceivData() +{ + QDateTime now = QDateTime::currentDateTime(); + for (int i = 1; i <= 8; i++) + { + QByteArray buffer; + + QString channel = QString("%1").arg(i); + QString channelRef = "1"; + QString dataValue = QString("%1").arg(qrand() % 400); + QString level = QString("%1").arg(qrand() % 4 / (double) 10); + QString frameId = QString("%1").arg(now.toSecsSinceEpoch() % 10000); + + buffer.append("$GL,") + .append("0,").append(channel + ",").append("1,").append(channelRef + ",") + .append(dataValue + ",").append("1,").append(level + ",").append(frameId + "*") + .append("00").append("\r\n"); + + emit dataRecieved(buffer); + } +} diff --git a/CounterAcq/common/utils/QSerialPortUtil.h b/CounterAcq/common/utils/QSerialPortUtil.h index 1a7f62f..accf403 100644 --- a/CounterAcq/common/utils/QSerialPortUtil.h +++ b/CounterAcq/common/utils/QSerialPortUtil.h @@ -21,6 +21,8 @@ bool open; + void mockReceivData(); + signals: void dataRecieved(QByteArray data); // 收到数据的信号 }; diff --git a/CounterAcq/common/utils/SettingConfig.cpp b/CounterAcq/common/utils/SettingConfig.cpp index 0c9d89d..67a8dd0 100644 --- a/CounterAcq/common/utils/SettingConfig.cpp +++ b/CounterAcq/common/utils/SettingConfig.cpp @@ -9,6 +9,7 @@ BAUD_RATE = getProperty("com", "baudRate").toUInt(); DEV_CODES = getProperty("com", "devCodes").toString(); + NEED_KAFKA = getProperty("kafka", "needKafka").toUInt(); KAFKA_BROKERS = getProperty("kafka", "brokers").toString(); KAFKA_DATA_TOPIC = getProperty("kafka", "dataTopic").toString(); diff --git a/CounterAcq/common/utils/SettingConfig.h b/CounterAcq/common/utils/SettingConfig.h index 8f752a0..2d10a4d 100644 --- a/CounterAcq/common/utils/SettingConfig.h +++ b/CounterAcq/common/utils/SettingConfig.h @@ -31,6 +31,7 @@ int BAUD_RATE; QString DEV_CODES; + int NEED_KAFKA; QString KAFKA_BROKERS; QString KAFKA_DATA_TOPIC; diff --git a/CounterAcq/conf/config.ini b/CounterAcq/conf/config.ini index 1430594..fdc1c17 100644 --- a/CounterAcq/conf/config.ini +++ b/CounterAcq/conf/config.ini @@ -4,8 +4,9 @@ baudRate=115200 [kafka] +neekKafka=1 brokers="111.198.10.15:12502" -dataTopic="clock-data" +dataTopic="cppTest" [client] clientId="112233445566" diff --git a/CounterAcq/protocol/CounterProtocolBM.cpp b/CounterAcq/protocol/CounterProtocolBM.cpp index 63a047b..b33be0d 100644 --- a/CounterAcq/protocol/CounterProtocolBM.cpp +++ b/CounterAcq/protocol/CounterProtocolBM.cpp @@ -20,17 +20,17 @@ QString contentStr = QString(content); QStringList subList = contentStr.split(","); - std::cout << contentStr.toStdString() << std::endl; - counterData->type = subList.at(0).toUInt(); counterData->channelId = subList.at(1).toUInt(); counterData->channelActive = subList.at(2).toUInt(); counterData->channelRefId = subList.at(3).toUInt(); - counterData->channelData = subList.at(4).toULongLong(); + counterData->channelData = subList.at(4).toLongLong(); counterData->load = subList.at(5).toDouble(); counterData->level = subList.at(6).toDouble(); counterData->frameId = subList.at(7); + counterData->channelClockValue = counterData->channelData * 1E-11; + return true; } diff --git a/CounterAcq/protocol/dto/CounterDataDto.cpp b/CounterAcq/protocol/dto/CounterDataDto.cpp index c86988a..99b8b09 100644 --- a/CounterAcq/protocol/dto/CounterDataDto.cpp +++ b/CounterAcq/protocol/dto/CounterDataDto.cpp @@ -12,11 +12,11 @@ QJsonObject dataObj; dataObj.insert("channelRefNo", this->channelRefId); - dataObj.insert("dataValue", this->channelData); + dataObj.insert("dataValue", QString("%1").arg(this->channelClockValue)); dataObj.insert("frameId", this->frameId); jsonObj.insert("channelNo", this->channelId); - jsonObj.insert("js", this->milisecond); + jsonObj.insert("ts", this->milisecond); jsonObj.insert("data", dataObj); return jsonObj; diff --git a/CounterAcq/protocol/dto/CounterDataDto.h b/CounterAcq/protocol/dto/CounterDataDto.h index 3008f48..6f4488f 100644 --- a/CounterAcq/protocol/dto/CounterDataDto.h +++ b/CounterAcq/protocol/dto/CounterDataDto.h @@ -20,6 +20,8 @@ qint8 channelId; // 测量通道号 -- <2> qint8 type = 0; // 测量状态 -- <1> + double channelClockValue; // + QByteArray rawFrame; // 原始帧字节数组 QString timestamp; // 时间戳字符串 diff --git a/ZXSSCJ.pro b/ZXSSCJ.pro index 87b1bd3..5d8d1e2 100644 --- a/ZXSSCJ.pro +++ b/ZXSSCJ.pro @@ -4,6 +4,6 @@ #CONFIG += ordered SUBDIRS += CounterAcq #计数器数据采集 -SUBDIRS += PhaseCompAcq #比相仪数据采集 +#SUBDIRS += PhaseCompAcq #比相仪数据采集 #SUBDIRS += DevStatusAcq #SUBDIRS += HClockAcq #氢钟状态数据采集