diff --git a/DevStatusAcq/DevStatusWindow.cpp b/DevStatusAcq/DevStatusWindow.cpp index 5e11c14..3d3b427 100644 --- a/DevStatusAcq/DevStatusWindow.cpp +++ b/DevStatusAcq/DevStatusWindow.cpp @@ -51,11 +51,25 @@ connect(freqTunDevice, &FrequencyTuning::sendDataToDraw, this, &DevStatusWindow::drawFrameDataOnPage); + + this->kafkaConsumer = new QKafkaConsumer(this); + + connect(kafkaConsumer, &QKafkaConsumer::messageRecieved, + this, &DevStatusWindow::devSettingCommandRecieved); + + kafkaConsumer->setBrokers("111.198.10.15:12502"); + kafkaConsumer->setTopic("cppTest"); + kafkaConsumer->createConsumer(); + kafkaConsumer->start(); + } DevStatusWindow::~DevStatusWindow() { + kafkaConsumer->deleteLater(); + delete ui; + delete kafkaConsumer; } void DevStatusWindow::drawFrameDataOnPage(DeviceFrameBaseDto * frameData) @@ -80,6 +94,67 @@ } } +void DevStatusWindow::devSettingCommandRecieved(QString commandJsonStr) +{ + std::cout << commandJsonStr.toStdString() << std::endl; + QJsonParseError jsonErr; + QJsonDocument doc = QJsonDocument::fromJson(commandJsonStr.toUtf8(), &jsonErr); + if (jsonErr.error == QJsonParseError::NoError) + { + QJsonObject commandJson = doc.object(); + QString deviceId; + QString deviceType; + QString command; + QString params; + if (commandJson.contains("deviceType") == true) + { + deviceType = commandJson.value("deviceType").toString(); + if (deviceType.isEmpty() == false) + { + for (int i = 0; i < ui->devTypeSelect->count(); i++) + { + if (ui->devTypeSelect->itemData(i).toString() == deviceType) + { + ui->devTypeSelect->setCurrentIndex(i); + } + } + } + } + + if (commandJson.contains("deviceId") == true) + { + deviceId = commandJson.value("deviceId").toString(); + if (deviceId.isEmpty() == false) + { + for (int i = 0; i < ui->devSelect->count(); i++) + { + if (ui->devSelect->itemData(i).toJsonObject().find("deviceId")->toString() == deviceId) + { + ui->devSelect->setCurrentIndex(i); + } + } + } + } + + if (commandJson.contains("command") == true) + { + command = commandJson.value("command").toString(); + } + if (commandJson.contains("params") == true) + { + params = commandJson.value("params").toString(); + } + + std::cout << deviceId.toStdString() << " " << deviceType.toStdString() << " " << command.toStdString() << " " << params.toStdString() << std::endl; + + if (deviceType == "03") + { + QByteArray commandBytes = freqTunDevice->protocol->generateSettingCommand(command, params); + freqTunDevice->sendDataToSerial(commandBytes); + } + } +} + int DevStatusWindow::initHttpToken() { QJsonObject response = httpReq->getTokenByClientId(SettingConfig::getInstance().CLIENT_ID, diff --git a/DevStatusAcq/DevStatusWindow.cpp b/DevStatusAcq/DevStatusWindow.cpp index 5e11c14..3d3b427 100644 --- a/DevStatusAcq/DevStatusWindow.cpp +++ b/DevStatusAcq/DevStatusWindow.cpp @@ -51,11 +51,25 @@ connect(freqTunDevice, &FrequencyTuning::sendDataToDraw, this, &DevStatusWindow::drawFrameDataOnPage); + + this->kafkaConsumer = new QKafkaConsumer(this); + + connect(kafkaConsumer, &QKafkaConsumer::messageRecieved, + this, &DevStatusWindow::devSettingCommandRecieved); + + kafkaConsumer->setBrokers("111.198.10.15:12502"); + kafkaConsumer->setTopic("cppTest"); + kafkaConsumer->createConsumer(); + kafkaConsumer->start(); + } DevStatusWindow::~DevStatusWindow() { + kafkaConsumer->deleteLater(); + delete ui; + delete kafkaConsumer; } void DevStatusWindow::drawFrameDataOnPage(DeviceFrameBaseDto * frameData) @@ -80,6 +94,67 @@ } } +void DevStatusWindow::devSettingCommandRecieved(QString commandJsonStr) +{ + std::cout << commandJsonStr.toStdString() << std::endl; + QJsonParseError jsonErr; + QJsonDocument doc = QJsonDocument::fromJson(commandJsonStr.toUtf8(), &jsonErr); + if (jsonErr.error == QJsonParseError::NoError) + { + QJsonObject commandJson = doc.object(); + QString deviceId; + QString deviceType; + QString command; + QString params; + if (commandJson.contains("deviceType") == true) + { + deviceType = commandJson.value("deviceType").toString(); + if (deviceType.isEmpty() == false) + { + for (int i = 0; i < ui->devTypeSelect->count(); i++) + { + if (ui->devTypeSelect->itemData(i).toString() == deviceType) + { + ui->devTypeSelect->setCurrentIndex(i); + } + } + } + } + + if (commandJson.contains("deviceId") == true) + { + deviceId = commandJson.value("deviceId").toString(); + if (deviceId.isEmpty() == false) + { + for (int i = 0; i < ui->devSelect->count(); i++) + { + if (ui->devSelect->itemData(i).toJsonObject().find("deviceId")->toString() == deviceId) + { + ui->devSelect->setCurrentIndex(i); + } + } + } + } + + if (commandJson.contains("command") == true) + { + command = commandJson.value("command").toString(); + } + if (commandJson.contains("params") == true) + { + params = commandJson.value("params").toString(); + } + + std::cout << deviceId.toStdString() << " " << deviceType.toStdString() << " " << command.toStdString() << " " << params.toStdString() << std::endl; + + if (deviceType == "03") + { + QByteArray commandBytes = freqTunDevice->protocol->generateSettingCommand(command, params); + freqTunDevice->sendDataToSerial(commandBytes); + } + } +} + int DevStatusWindow::initHttpToken() { QJsonObject response = httpReq->getTokenByClientId(SettingConfig::getInstance().CLIENT_ID, diff --git a/DevStatusAcq/DevStatusWindow.h b/DevStatusAcq/DevStatusWindow.h index d27b5c6..71668ae 100644 --- a/DevStatusAcq/DevStatusWindow.h +++ b/DevStatusAcq/DevStatusWindow.h @@ -11,6 +11,7 @@ #include "device/FreqReplicator.h" #include "device/BCodeTerminal.h" #include "common/HttpRequestController.h" +#include "common/utils/QKafkaConsumer.h" QT_BEGIN_NAMESPACE namespace Ui { class DevStatusWindow; } @@ -27,25 +28,20 @@ public slots: void drawFrameDataOnPage(DeviceFrameBaseDto * frameData); + void devSettingCommandRecieved(QString commandJsonStr); + private slots: void on_devTypeSelect_currentIndexChanged(int index); void on_sigGenButt_clicked(); - void on_freqTunButt_clicked(); - void on_tmSwiButt_clicked(); - void on_freqSwiButt_clicked(); - void on_tmRepButt_clicked(); - void on_freqRepButt_clicked(); - void on_bctButt_clicked(); void on_exitButt_clicked(); - void on_minButt_clicked(); void on_ftSetFreqTurnButt_clicked(); @@ -55,27 +51,16 @@ void on_ftSetPulseWidthButt_clicked(); void on_sgLeapSecondSetButt_clicked(); - void on_sgSingleSynchSetButt_clicked(); - void on_sgDateSetButt_clicked(); - void on_sgSecondWidthSetButt_clicked(); - void on_sgBacRatioSetButt_clicked(); - void on_sgBacRangeSetButt_clicked(); - void on_sgOppsPhaseShiftSetButt_clicked(); - void on_sgLeapTimestampSetButt_clicked(); - void on_sgMJDDateSetButt_clicked(); - void on_sgTimeSetButt_clicked(); - void on_sgKeyControlSetButt_clicked(); - void on_sgTimeTypeSetButt_clicked(); private: @@ -94,5 +79,7 @@ TimeReplicator * timeRepDevice; FreqReplicator * freqRepDevice; BCodeTerminal * bCodeTermDevice; + + QKafkaConsumer * kafkaConsumer; }; #endif // DEVSTATUSWINDOW_H diff --git a/DevStatusAcq/DevStatusWindow.cpp b/DevStatusAcq/DevStatusWindow.cpp index 5e11c14..3d3b427 100644 --- a/DevStatusAcq/DevStatusWindow.cpp +++ b/DevStatusAcq/DevStatusWindow.cpp @@ -51,11 +51,25 @@ connect(freqTunDevice, &FrequencyTuning::sendDataToDraw, this, &DevStatusWindow::drawFrameDataOnPage); + + this->kafkaConsumer = new QKafkaConsumer(this); + + connect(kafkaConsumer, &QKafkaConsumer::messageRecieved, + this, &DevStatusWindow::devSettingCommandRecieved); + + kafkaConsumer->setBrokers("111.198.10.15:12502"); + kafkaConsumer->setTopic("cppTest"); + kafkaConsumer->createConsumer(); + kafkaConsumer->start(); + } DevStatusWindow::~DevStatusWindow() { + kafkaConsumer->deleteLater(); + delete ui; + delete kafkaConsumer; } void DevStatusWindow::drawFrameDataOnPage(DeviceFrameBaseDto * frameData) @@ -80,6 +94,67 @@ } } +void DevStatusWindow::devSettingCommandRecieved(QString commandJsonStr) +{ + std::cout << commandJsonStr.toStdString() << std::endl; + QJsonParseError jsonErr; + QJsonDocument doc = QJsonDocument::fromJson(commandJsonStr.toUtf8(), &jsonErr); + if (jsonErr.error == QJsonParseError::NoError) + { + QJsonObject commandJson = doc.object(); + QString deviceId; + QString deviceType; + QString command; + QString params; + if (commandJson.contains("deviceType") == true) + { + deviceType = commandJson.value("deviceType").toString(); + if (deviceType.isEmpty() == false) + { + for (int i = 0; i < ui->devTypeSelect->count(); i++) + { + if (ui->devTypeSelect->itemData(i).toString() == deviceType) + { + ui->devTypeSelect->setCurrentIndex(i); + } + } + } + } + + if (commandJson.contains("deviceId") == true) + { + deviceId = commandJson.value("deviceId").toString(); + if (deviceId.isEmpty() == false) + { + for (int i = 0; i < ui->devSelect->count(); i++) + { + if (ui->devSelect->itemData(i).toJsonObject().find("deviceId")->toString() == deviceId) + { + ui->devSelect->setCurrentIndex(i); + } + } + } + } + + if (commandJson.contains("command") == true) + { + command = commandJson.value("command").toString(); + } + if (commandJson.contains("params") == true) + { + params = commandJson.value("params").toString(); + } + + std::cout << deviceId.toStdString() << " " << deviceType.toStdString() << " " << command.toStdString() << " " << params.toStdString() << std::endl; + + if (deviceType == "03") + { + QByteArray commandBytes = freqTunDevice->protocol->generateSettingCommand(command, params); + freqTunDevice->sendDataToSerial(commandBytes); + } + } +} + int DevStatusWindow::initHttpToken() { QJsonObject response = httpReq->getTokenByClientId(SettingConfig::getInstance().CLIENT_ID, diff --git a/DevStatusAcq/DevStatusWindow.h b/DevStatusAcq/DevStatusWindow.h index d27b5c6..71668ae 100644 --- a/DevStatusAcq/DevStatusWindow.h +++ b/DevStatusAcq/DevStatusWindow.h @@ -11,6 +11,7 @@ #include "device/FreqReplicator.h" #include "device/BCodeTerminal.h" #include "common/HttpRequestController.h" +#include "common/utils/QKafkaConsumer.h" QT_BEGIN_NAMESPACE namespace Ui { class DevStatusWindow; } @@ -27,25 +28,20 @@ public slots: void drawFrameDataOnPage(DeviceFrameBaseDto * frameData); + void devSettingCommandRecieved(QString commandJsonStr); + private slots: void on_devTypeSelect_currentIndexChanged(int index); void on_sigGenButt_clicked(); - void on_freqTunButt_clicked(); - void on_tmSwiButt_clicked(); - void on_freqSwiButt_clicked(); - void on_tmRepButt_clicked(); - void on_freqRepButt_clicked(); - void on_bctButt_clicked(); void on_exitButt_clicked(); - void on_minButt_clicked(); void on_ftSetFreqTurnButt_clicked(); @@ -55,27 +51,16 @@ void on_ftSetPulseWidthButt_clicked(); void on_sgLeapSecondSetButt_clicked(); - void on_sgSingleSynchSetButt_clicked(); - void on_sgDateSetButt_clicked(); - void on_sgSecondWidthSetButt_clicked(); - void on_sgBacRatioSetButt_clicked(); - void on_sgBacRangeSetButt_clicked(); - void on_sgOppsPhaseShiftSetButt_clicked(); - void on_sgLeapTimestampSetButt_clicked(); - void on_sgMJDDateSetButt_clicked(); - void on_sgTimeSetButt_clicked(); - void on_sgKeyControlSetButt_clicked(); - void on_sgTimeTypeSetButt_clicked(); private: @@ -94,5 +79,7 @@ TimeReplicator * timeRepDevice; FreqReplicator * freqRepDevice; BCodeTerminal * bCodeTermDevice; + + QKafkaConsumer * kafkaConsumer; }; #endif // DEVSTATUSWINDOW_H diff --git a/DevStatusAcq/common/common.pri b/DevStatusAcq/common/common.pri index 6280f64..18b9b5d 100644 --- a/DevStatusAcq/common/common.pri +++ b/DevStatusAcq/common/common.pri @@ -1,5 +1,6 @@ -SOURCES += $$PWD/utils/SettingConfig.cpp +SOURCES += $$PWD/utils/SettingConfig.cpp \ + $$PWD/utils/QKafkaConsumer.cpp SOURCES += $$PWD/utils/QByteUtil.cpp SOURCES += $$PWD/utils/QSerialPortUtil.cpp SOURCES += $$PWD/utils/QLogUtil.cpp @@ -8,7 +9,8 @@ SOURCES += $$PWD/utils/MD5.cpp SOURCES += $$PWD/HttpRequestController.cpp -HEADERS += $$PWD/utils/SettingConfig.h +HEADERS += $$PWD/utils/SettingConfig.h \ + $$PWD/utils/QKafkaConsumer.h HEADERS += $$PWD/utils/QByteUtil.h HEADERS += $$PWD/utils/QSerialPortUtil.h HEADERS += $$PWD/utils/QLogUtil.h diff --git a/DevStatusAcq/DevStatusWindow.cpp b/DevStatusAcq/DevStatusWindow.cpp index 5e11c14..3d3b427 100644 --- a/DevStatusAcq/DevStatusWindow.cpp +++ b/DevStatusAcq/DevStatusWindow.cpp @@ -51,11 +51,25 @@ connect(freqTunDevice, &FrequencyTuning::sendDataToDraw, this, &DevStatusWindow::drawFrameDataOnPage); + + this->kafkaConsumer = new QKafkaConsumer(this); + + connect(kafkaConsumer, &QKafkaConsumer::messageRecieved, + this, &DevStatusWindow::devSettingCommandRecieved); + + kafkaConsumer->setBrokers("111.198.10.15:12502"); + kafkaConsumer->setTopic("cppTest"); + kafkaConsumer->createConsumer(); + kafkaConsumer->start(); + } DevStatusWindow::~DevStatusWindow() { + kafkaConsumer->deleteLater(); + delete ui; + delete kafkaConsumer; } void DevStatusWindow::drawFrameDataOnPage(DeviceFrameBaseDto * frameData) @@ -80,6 +94,67 @@ } } +void DevStatusWindow::devSettingCommandRecieved(QString commandJsonStr) +{ + std::cout << commandJsonStr.toStdString() << std::endl; + QJsonParseError jsonErr; + QJsonDocument doc = QJsonDocument::fromJson(commandJsonStr.toUtf8(), &jsonErr); + if (jsonErr.error == QJsonParseError::NoError) + { + QJsonObject commandJson = doc.object(); + QString deviceId; + QString deviceType; + QString command; + QString params; + if (commandJson.contains("deviceType") == true) + { + deviceType = commandJson.value("deviceType").toString(); + if (deviceType.isEmpty() == false) + { + for (int i = 0; i < ui->devTypeSelect->count(); i++) + { + if (ui->devTypeSelect->itemData(i).toString() == deviceType) + { + ui->devTypeSelect->setCurrentIndex(i); + } + } + } + } + + if (commandJson.contains("deviceId") == true) + { + deviceId = commandJson.value("deviceId").toString(); + if (deviceId.isEmpty() == false) + { + for (int i = 0; i < ui->devSelect->count(); i++) + { + if (ui->devSelect->itemData(i).toJsonObject().find("deviceId")->toString() == deviceId) + { + ui->devSelect->setCurrentIndex(i); + } + } + } + } + + if (commandJson.contains("command") == true) + { + command = commandJson.value("command").toString(); + } + if (commandJson.contains("params") == true) + { + params = commandJson.value("params").toString(); + } + + std::cout << deviceId.toStdString() << " " << deviceType.toStdString() << " " << command.toStdString() << " " << params.toStdString() << std::endl; + + if (deviceType == "03") + { + QByteArray commandBytes = freqTunDevice->protocol->generateSettingCommand(command, params); + freqTunDevice->sendDataToSerial(commandBytes); + } + } +} + int DevStatusWindow::initHttpToken() { QJsonObject response = httpReq->getTokenByClientId(SettingConfig::getInstance().CLIENT_ID, diff --git a/DevStatusAcq/DevStatusWindow.h b/DevStatusAcq/DevStatusWindow.h index d27b5c6..71668ae 100644 --- a/DevStatusAcq/DevStatusWindow.h +++ b/DevStatusAcq/DevStatusWindow.h @@ -11,6 +11,7 @@ #include "device/FreqReplicator.h" #include "device/BCodeTerminal.h" #include "common/HttpRequestController.h" +#include "common/utils/QKafkaConsumer.h" QT_BEGIN_NAMESPACE namespace Ui { class DevStatusWindow; } @@ -27,25 +28,20 @@ public slots: void drawFrameDataOnPage(DeviceFrameBaseDto * frameData); + void devSettingCommandRecieved(QString commandJsonStr); + private slots: void on_devTypeSelect_currentIndexChanged(int index); void on_sigGenButt_clicked(); - void on_freqTunButt_clicked(); - void on_tmSwiButt_clicked(); - void on_freqSwiButt_clicked(); - void on_tmRepButt_clicked(); - void on_freqRepButt_clicked(); - void on_bctButt_clicked(); void on_exitButt_clicked(); - void on_minButt_clicked(); void on_ftSetFreqTurnButt_clicked(); @@ -55,27 +51,16 @@ void on_ftSetPulseWidthButt_clicked(); void on_sgLeapSecondSetButt_clicked(); - void on_sgSingleSynchSetButt_clicked(); - void on_sgDateSetButt_clicked(); - void on_sgSecondWidthSetButt_clicked(); - void on_sgBacRatioSetButt_clicked(); - void on_sgBacRangeSetButt_clicked(); - void on_sgOppsPhaseShiftSetButt_clicked(); - void on_sgLeapTimestampSetButt_clicked(); - void on_sgMJDDateSetButt_clicked(); - void on_sgTimeSetButt_clicked(); - void on_sgKeyControlSetButt_clicked(); - void on_sgTimeTypeSetButt_clicked(); private: @@ -94,5 +79,7 @@ TimeReplicator * timeRepDevice; FreqReplicator * freqRepDevice; BCodeTerminal * bCodeTermDevice; + + QKafkaConsumer * kafkaConsumer; }; #endif // DEVSTATUSWINDOW_H diff --git a/DevStatusAcq/common/common.pri b/DevStatusAcq/common/common.pri index 6280f64..18b9b5d 100644 --- a/DevStatusAcq/common/common.pri +++ b/DevStatusAcq/common/common.pri @@ -1,5 +1,6 @@ -SOURCES += $$PWD/utils/SettingConfig.cpp +SOURCES += $$PWD/utils/SettingConfig.cpp \ + $$PWD/utils/QKafkaConsumer.cpp SOURCES += $$PWD/utils/QByteUtil.cpp SOURCES += $$PWD/utils/QSerialPortUtil.cpp SOURCES += $$PWD/utils/QLogUtil.cpp @@ -8,7 +9,8 @@ SOURCES += $$PWD/utils/MD5.cpp SOURCES += $$PWD/HttpRequestController.cpp -HEADERS += $$PWD/utils/SettingConfig.h +HEADERS += $$PWD/utils/SettingConfig.h \ + $$PWD/utils/QKafkaConsumer.h HEADERS += $$PWD/utils/QByteUtil.h HEADERS += $$PWD/utils/QSerialPortUtil.h HEADERS += $$PWD/utils/QLogUtil.h diff --git a/DevStatusAcq/common/utils/QKafkaConsumer.cpp b/DevStatusAcq/common/utils/QKafkaConsumer.cpp new file mode 100644 index 0000000..2ce6d03 --- /dev/null +++ b/DevStatusAcq/common/utils/QKafkaConsumer.cpp @@ -0,0 +1,116 @@ +#include "QKafkaConsumer.h" +#include + +static volatile int runFlag = 1; +static bool exit_eof = false; + +QKafkaConsumer::QKafkaConsumer(QObject *parent) : QThread(parent) +{ + this->conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + this->tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); + + conf->set("enable.partition.eof", "true", errStr); +} + +void QKafkaConsumer::setBrokers(QString brokers) +{ + this->brokers = brokers; +} +void QKafkaConsumer::setTopic(QString topic) +{ + this->topic = topic; +} + +int QKafkaConsumer::createConsumer() +{ + int ret = conf->set("metadata.broker.list", brokers.toStdString(), errStr); + if (ret != RdKafka::Conf::CONF_OK) + { + std::cerr << "RdKafka conf set brokerlist failed :" << errStr.c_str() << std::endl; + } + + consumer = RdKafka::Consumer::create(conf, errStr); + if (!consumer) { + std::cerr << "Failed to create consumer: " << errStr << std::endl; + return -1; + } + + std::cout << "% Created consumer " << consumer->name() << std::endl; + + return 1; +} + +void QKafkaConsumer::run() +{ + RdKafka::Topic * topic = RdKafka::Topic::create(consumer, this->topic.toStdString(), tconf, errStr); + if (!topic) { + std::cerr << "Failed to create topic: " << errStr << std::endl; + } + + RdKafka::ErrorCode resp = consumer->start(topic, 0, RdKafka::Topic::OFFSET_END); + if (resp != RdKafka::ERR_NO_ERROR) { + std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << std::endl; + } + + while (runFlag) + { + RdKafka::Message * message = consumer->consume(topic, 0, 200); + messageConsume(message); + } +} + +void QKafkaConsumer::messageConsume(RdKafka::Message* message) { + const RdKafka::Headers *headers; + + switch (message->err()) { + case RdKafka::ERR__TIMED_OUT: + break; + + case RdKafka::ERR_NO_ERROR: + { + /* Real message */ +// std::cout << "Read msg at offset " << message->offset() << std::endl; +// printf("%.*s\n", static_cast(message->len()), static_cast(message->payload())); + + QString messageStr = static_cast(message->payload()); + emit messageRecieved(messageStr); + + if (message->key()) { + std::cout << "Key: " << *message->key() << std::endl; + } + headers = message->headers(); + if (headers) { + std::vector hdrs = headers->get_all(); + for (size_t i = 0 ; i < hdrs.size() ; i++) { + const RdKafka::Headers::Header hdr = hdrs[i]; + + if (hdr.value() != NULL) + printf(" Header: %s = \"%.*s\"\n", + hdr.key().c_str(), + (int)hdr.value_size(), (const char *)hdr.value()); + else + printf(" Header: %s = NULL\n", hdr.key().c_str()); + } + } + break; + } + + case RdKafka::ERR__PARTITION_EOF: + /* Last message */ + if (exit_eof) { + runFlag = 0; + } + break; + + case RdKafka::ERR__UNKNOWN_TOPIC: + case RdKafka::ERR__UNKNOWN_PARTITION: + std::cerr << "Consume failed: " << message->errstr() << std::endl; + runFlag = 0; + break; + + default: + /* Errors */ + std::cerr << "Consume failed: " << message->errstr() << std::endl; + runFlag = 0; + } +} diff --git a/DevStatusAcq/DevStatusWindow.cpp b/DevStatusAcq/DevStatusWindow.cpp index 5e11c14..3d3b427 100644 --- a/DevStatusAcq/DevStatusWindow.cpp +++ b/DevStatusAcq/DevStatusWindow.cpp @@ -51,11 +51,25 @@ connect(freqTunDevice, &FrequencyTuning::sendDataToDraw, this, &DevStatusWindow::drawFrameDataOnPage); + + this->kafkaConsumer = new QKafkaConsumer(this); + + connect(kafkaConsumer, &QKafkaConsumer::messageRecieved, + this, &DevStatusWindow::devSettingCommandRecieved); + + kafkaConsumer->setBrokers("111.198.10.15:12502"); + kafkaConsumer->setTopic("cppTest"); + kafkaConsumer->createConsumer(); + kafkaConsumer->start(); + } DevStatusWindow::~DevStatusWindow() { + kafkaConsumer->deleteLater(); + delete ui; + delete kafkaConsumer; } void DevStatusWindow::drawFrameDataOnPage(DeviceFrameBaseDto * frameData) @@ -80,6 +94,67 @@ } } +void DevStatusWindow::devSettingCommandRecieved(QString commandJsonStr) +{ + std::cout << commandJsonStr.toStdString() << std::endl; + QJsonParseError jsonErr; + QJsonDocument doc = QJsonDocument::fromJson(commandJsonStr.toUtf8(), &jsonErr); + if (jsonErr.error == QJsonParseError::NoError) + { + QJsonObject commandJson = doc.object(); + QString deviceId; + QString deviceType; + QString command; + QString params; + if (commandJson.contains("deviceType") == true) + { + deviceType = commandJson.value("deviceType").toString(); + if (deviceType.isEmpty() == false) + { + for (int i = 0; i < ui->devTypeSelect->count(); i++) + { + if (ui->devTypeSelect->itemData(i).toString() == deviceType) + { + ui->devTypeSelect->setCurrentIndex(i); + } + } + } + } + + if (commandJson.contains("deviceId") == true) + { + deviceId = commandJson.value("deviceId").toString(); + if (deviceId.isEmpty() == false) + { + for (int i = 0; i < ui->devSelect->count(); i++) + { + if (ui->devSelect->itemData(i).toJsonObject().find("deviceId")->toString() == deviceId) + { + ui->devSelect->setCurrentIndex(i); + } + } + } + } + + if (commandJson.contains("command") == true) + { + command = commandJson.value("command").toString(); + } + if (commandJson.contains("params") == true) + { + params = commandJson.value("params").toString(); + } + + std::cout << deviceId.toStdString() << " " << deviceType.toStdString() << " " << command.toStdString() << " " << params.toStdString() << std::endl; + + if (deviceType == "03") + { + QByteArray commandBytes = freqTunDevice->protocol->generateSettingCommand(command, params); + freqTunDevice->sendDataToSerial(commandBytes); + } + } +} + int DevStatusWindow::initHttpToken() { QJsonObject response = httpReq->getTokenByClientId(SettingConfig::getInstance().CLIENT_ID, diff --git a/DevStatusAcq/DevStatusWindow.h b/DevStatusAcq/DevStatusWindow.h index d27b5c6..71668ae 100644 --- a/DevStatusAcq/DevStatusWindow.h +++ b/DevStatusAcq/DevStatusWindow.h @@ -11,6 +11,7 @@ #include "device/FreqReplicator.h" #include "device/BCodeTerminal.h" #include "common/HttpRequestController.h" +#include "common/utils/QKafkaConsumer.h" QT_BEGIN_NAMESPACE namespace Ui { class DevStatusWindow; } @@ -27,25 +28,20 @@ public slots: void drawFrameDataOnPage(DeviceFrameBaseDto * frameData); + void devSettingCommandRecieved(QString commandJsonStr); + private slots: void on_devTypeSelect_currentIndexChanged(int index); void on_sigGenButt_clicked(); - void on_freqTunButt_clicked(); - void on_tmSwiButt_clicked(); - void on_freqSwiButt_clicked(); - void on_tmRepButt_clicked(); - void on_freqRepButt_clicked(); - void on_bctButt_clicked(); void on_exitButt_clicked(); - void on_minButt_clicked(); void on_ftSetFreqTurnButt_clicked(); @@ -55,27 +51,16 @@ void on_ftSetPulseWidthButt_clicked(); void on_sgLeapSecondSetButt_clicked(); - void on_sgSingleSynchSetButt_clicked(); - void on_sgDateSetButt_clicked(); - void on_sgSecondWidthSetButt_clicked(); - void on_sgBacRatioSetButt_clicked(); - void on_sgBacRangeSetButt_clicked(); - void on_sgOppsPhaseShiftSetButt_clicked(); - void on_sgLeapTimestampSetButt_clicked(); - void on_sgMJDDateSetButt_clicked(); - void on_sgTimeSetButt_clicked(); - void on_sgKeyControlSetButt_clicked(); - void on_sgTimeTypeSetButt_clicked(); private: @@ -94,5 +79,7 @@ TimeReplicator * timeRepDevice; FreqReplicator * freqRepDevice; BCodeTerminal * bCodeTermDevice; + + QKafkaConsumer * kafkaConsumer; }; #endif // DEVSTATUSWINDOW_H diff --git a/DevStatusAcq/common/common.pri b/DevStatusAcq/common/common.pri index 6280f64..18b9b5d 100644 --- a/DevStatusAcq/common/common.pri +++ b/DevStatusAcq/common/common.pri @@ -1,5 +1,6 @@ -SOURCES += $$PWD/utils/SettingConfig.cpp +SOURCES += $$PWD/utils/SettingConfig.cpp \ + $$PWD/utils/QKafkaConsumer.cpp SOURCES += $$PWD/utils/QByteUtil.cpp SOURCES += $$PWD/utils/QSerialPortUtil.cpp SOURCES += $$PWD/utils/QLogUtil.cpp @@ -8,7 +9,8 @@ SOURCES += $$PWD/utils/MD5.cpp SOURCES += $$PWD/HttpRequestController.cpp -HEADERS += $$PWD/utils/SettingConfig.h +HEADERS += $$PWD/utils/SettingConfig.h \ + $$PWD/utils/QKafkaConsumer.h HEADERS += $$PWD/utils/QByteUtil.h HEADERS += $$PWD/utils/QSerialPortUtil.h HEADERS += $$PWD/utils/QLogUtil.h diff --git a/DevStatusAcq/common/utils/QKafkaConsumer.cpp b/DevStatusAcq/common/utils/QKafkaConsumer.cpp new file mode 100644 index 0000000..2ce6d03 --- /dev/null +++ b/DevStatusAcq/common/utils/QKafkaConsumer.cpp @@ -0,0 +1,116 @@ +#include "QKafkaConsumer.h" +#include + +static volatile int runFlag = 1; +static bool exit_eof = false; + +QKafkaConsumer::QKafkaConsumer(QObject *parent) : QThread(parent) +{ + this->conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + this->tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); + + conf->set("enable.partition.eof", "true", errStr); +} + +void QKafkaConsumer::setBrokers(QString brokers) +{ + this->brokers = brokers; +} +void QKafkaConsumer::setTopic(QString topic) +{ + this->topic = topic; +} + +int QKafkaConsumer::createConsumer() +{ + int ret = conf->set("metadata.broker.list", brokers.toStdString(), errStr); + if (ret != RdKafka::Conf::CONF_OK) + { + std::cerr << "RdKafka conf set brokerlist failed :" << errStr.c_str() << std::endl; + } + + consumer = RdKafka::Consumer::create(conf, errStr); + if (!consumer) { + std::cerr << "Failed to create consumer: " << errStr << std::endl; + return -1; + } + + std::cout << "% Created consumer " << consumer->name() << std::endl; + + return 1; +} + +void QKafkaConsumer::run() +{ + RdKafka::Topic * topic = RdKafka::Topic::create(consumer, this->topic.toStdString(), tconf, errStr); + if (!topic) { + std::cerr << "Failed to create topic: " << errStr << std::endl; + } + + RdKafka::ErrorCode resp = consumer->start(topic, 0, RdKafka::Topic::OFFSET_END); + if (resp != RdKafka::ERR_NO_ERROR) { + std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << std::endl; + } + + while (runFlag) + { + RdKafka::Message * message = consumer->consume(topic, 0, 200); + messageConsume(message); + } +} + +void QKafkaConsumer::messageConsume(RdKafka::Message* message) { + const RdKafka::Headers *headers; + + switch (message->err()) { + case RdKafka::ERR__TIMED_OUT: + break; + + case RdKafka::ERR_NO_ERROR: + { + /* Real message */ +// std::cout << "Read msg at offset " << message->offset() << std::endl; +// printf("%.*s\n", static_cast(message->len()), static_cast(message->payload())); + + QString messageStr = static_cast(message->payload()); + emit messageRecieved(messageStr); + + if (message->key()) { + std::cout << "Key: " << *message->key() << std::endl; + } + headers = message->headers(); + if (headers) { + std::vector hdrs = headers->get_all(); + for (size_t i = 0 ; i < hdrs.size() ; i++) { + const RdKafka::Headers::Header hdr = hdrs[i]; + + if (hdr.value() != NULL) + printf(" Header: %s = \"%.*s\"\n", + hdr.key().c_str(), + (int)hdr.value_size(), (const char *)hdr.value()); + else + printf(" Header: %s = NULL\n", hdr.key().c_str()); + } + } + break; + } + + case RdKafka::ERR__PARTITION_EOF: + /* Last message */ + if (exit_eof) { + runFlag = 0; + } + break; + + case RdKafka::ERR__UNKNOWN_TOPIC: + case RdKafka::ERR__UNKNOWN_PARTITION: + std::cerr << "Consume failed: " << message->errstr() << std::endl; + runFlag = 0; + break; + + default: + /* Errors */ + std::cerr << "Consume failed: " << message->errstr() << std::endl; + runFlag = 0; + } +} diff --git a/DevStatusAcq/common/utils/QKafkaConsumer.h b/DevStatusAcq/common/utils/QKafkaConsumer.h new file mode 100644 index 0000000..f278676 --- /dev/null +++ b/DevStatusAcq/common/utils/QKafkaConsumer.h @@ -0,0 +1,38 @@ +#ifndef QKAFKACONSUMER_H +#define QKAFKACONSUMER_H + +#include + +#include "include/librdkafka/rdkafkacpp.h" + +class QKafkaConsumer : public QThread +{ + Q_OBJECT +public: + explicit QKafkaConsumer(QObject *parent = nullptr); + + void setBrokers(QString brokers); + void setTopic(QString topic); + + int createConsumer(); + void run(); + + void messageConsume(RdKafka::Message * message); + +private: + QString brokers; + QString topic; + + std::string errStr; + + RdKafka::Conf * conf; + RdKafka::Conf * tconf; + + RdKafka::Consumer * consumer = 0; + +signals: + void messageRecieved(QString message); + +}; + +#endif // QKAFKACONSUMER_H diff --git a/DevStatusAcq/DevStatusWindow.cpp b/DevStatusAcq/DevStatusWindow.cpp index 5e11c14..3d3b427 100644 --- a/DevStatusAcq/DevStatusWindow.cpp +++ b/DevStatusAcq/DevStatusWindow.cpp @@ -51,11 +51,25 @@ connect(freqTunDevice, &FrequencyTuning::sendDataToDraw, this, &DevStatusWindow::drawFrameDataOnPage); + + this->kafkaConsumer = new QKafkaConsumer(this); + + connect(kafkaConsumer, &QKafkaConsumer::messageRecieved, + this, &DevStatusWindow::devSettingCommandRecieved); + + kafkaConsumer->setBrokers("111.198.10.15:12502"); + kafkaConsumer->setTopic("cppTest"); + kafkaConsumer->createConsumer(); + kafkaConsumer->start(); + } DevStatusWindow::~DevStatusWindow() { + kafkaConsumer->deleteLater(); + delete ui; + delete kafkaConsumer; } void DevStatusWindow::drawFrameDataOnPage(DeviceFrameBaseDto * frameData) @@ -80,6 +94,67 @@ } } +void DevStatusWindow::devSettingCommandRecieved(QString commandJsonStr) +{ + std::cout << commandJsonStr.toStdString() << std::endl; + QJsonParseError jsonErr; + QJsonDocument doc = QJsonDocument::fromJson(commandJsonStr.toUtf8(), &jsonErr); + if (jsonErr.error == QJsonParseError::NoError) + { + QJsonObject commandJson = doc.object(); + QString deviceId; + QString deviceType; + QString command; + QString params; + if (commandJson.contains("deviceType") == true) + { + deviceType = commandJson.value("deviceType").toString(); + if (deviceType.isEmpty() == false) + { + for (int i = 0; i < ui->devTypeSelect->count(); i++) + { + if (ui->devTypeSelect->itemData(i).toString() == deviceType) + { + ui->devTypeSelect->setCurrentIndex(i); + } + } + } + } + + if (commandJson.contains("deviceId") == true) + { + deviceId = commandJson.value("deviceId").toString(); + if (deviceId.isEmpty() == false) + { + for (int i = 0; i < ui->devSelect->count(); i++) + { + if (ui->devSelect->itemData(i).toJsonObject().find("deviceId")->toString() == deviceId) + { + ui->devSelect->setCurrentIndex(i); + } + } + } + } + + if (commandJson.contains("command") == true) + { + command = commandJson.value("command").toString(); + } + if (commandJson.contains("params") == true) + { + params = commandJson.value("params").toString(); + } + + std::cout << deviceId.toStdString() << " " << deviceType.toStdString() << " " << command.toStdString() << " " << params.toStdString() << std::endl; + + if (deviceType == "03") + { + QByteArray commandBytes = freqTunDevice->protocol->generateSettingCommand(command, params); + freqTunDevice->sendDataToSerial(commandBytes); + } + } +} + int DevStatusWindow::initHttpToken() { QJsonObject response = httpReq->getTokenByClientId(SettingConfig::getInstance().CLIENT_ID, diff --git a/DevStatusAcq/DevStatusWindow.h b/DevStatusAcq/DevStatusWindow.h index d27b5c6..71668ae 100644 --- a/DevStatusAcq/DevStatusWindow.h +++ b/DevStatusAcq/DevStatusWindow.h @@ -11,6 +11,7 @@ #include "device/FreqReplicator.h" #include "device/BCodeTerminal.h" #include "common/HttpRequestController.h" +#include "common/utils/QKafkaConsumer.h" QT_BEGIN_NAMESPACE namespace Ui { class DevStatusWindow; } @@ -27,25 +28,20 @@ public slots: void drawFrameDataOnPage(DeviceFrameBaseDto * frameData); + void devSettingCommandRecieved(QString commandJsonStr); + private slots: void on_devTypeSelect_currentIndexChanged(int index); void on_sigGenButt_clicked(); - void on_freqTunButt_clicked(); - void on_tmSwiButt_clicked(); - void on_freqSwiButt_clicked(); - void on_tmRepButt_clicked(); - void on_freqRepButt_clicked(); - void on_bctButt_clicked(); void on_exitButt_clicked(); - void on_minButt_clicked(); void on_ftSetFreqTurnButt_clicked(); @@ -55,27 +51,16 @@ void on_ftSetPulseWidthButt_clicked(); void on_sgLeapSecondSetButt_clicked(); - void on_sgSingleSynchSetButt_clicked(); - void on_sgDateSetButt_clicked(); - void on_sgSecondWidthSetButt_clicked(); - void on_sgBacRatioSetButt_clicked(); - void on_sgBacRangeSetButt_clicked(); - void on_sgOppsPhaseShiftSetButt_clicked(); - void on_sgLeapTimestampSetButt_clicked(); - void on_sgMJDDateSetButt_clicked(); - void on_sgTimeSetButt_clicked(); - void on_sgKeyControlSetButt_clicked(); - void on_sgTimeTypeSetButt_clicked(); private: @@ -94,5 +79,7 @@ TimeReplicator * timeRepDevice; FreqReplicator * freqRepDevice; BCodeTerminal * bCodeTermDevice; + + QKafkaConsumer * kafkaConsumer; }; #endif // DEVSTATUSWINDOW_H diff --git a/DevStatusAcq/common/common.pri b/DevStatusAcq/common/common.pri index 6280f64..18b9b5d 100644 --- a/DevStatusAcq/common/common.pri +++ b/DevStatusAcq/common/common.pri @@ -1,5 +1,6 @@ -SOURCES += $$PWD/utils/SettingConfig.cpp +SOURCES += $$PWD/utils/SettingConfig.cpp \ + $$PWD/utils/QKafkaConsumer.cpp SOURCES += $$PWD/utils/QByteUtil.cpp SOURCES += $$PWD/utils/QSerialPortUtil.cpp SOURCES += $$PWD/utils/QLogUtil.cpp @@ -8,7 +9,8 @@ SOURCES += $$PWD/utils/MD5.cpp SOURCES += $$PWD/HttpRequestController.cpp -HEADERS += $$PWD/utils/SettingConfig.h +HEADERS += $$PWD/utils/SettingConfig.h \ + $$PWD/utils/QKafkaConsumer.h HEADERS += $$PWD/utils/QByteUtil.h HEADERS += $$PWD/utils/QSerialPortUtil.h HEADERS += $$PWD/utils/QLogUtil.h diff --git a/DevStatusAcq/common/utils/QKafkaConsumer.cpp b/DevStatusAcq/common/utils/QKafkaConsumer.cpp new file mode 100644 index 0000000..2ce6d03 --- /dev/null +++ b/DevStatusAcq/common/utils/QKafkaConsumer.cpp @@ -0,0 +1,116 @@ +#include "QKafkaConsumer.h" +#include + +static volatile int runFlag = 1; +static bool exit_eof = false; + +QKafkaConsumer::QKafkaConsumer(QObject *parent) : QThread(parent) +{ + this->conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + this->tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); + + conf->set("enable.partition.eof", "true", errStr); +} + +void QKafkaConsumer::setBrokers(QString brokers) +{ + this->brokers = brokers; +} +void QKafkaConsumer::setTopic(QString topic) +{ + this->topic = topic; +} + +int QKafkaConsumer::createConsumer() +{ + int ret = conf->set("metadata.broker.list", brokers.toStdString(), errStr); + if (ret != RdKafka::Conf::CONF_OK) + { + std::cerr << "RdKafka conf set brokerlist failed :" << errStr.c_str() << std::endl; + } + + consumer = RdKafka::Consumer::create(conf, errStr); + if (!consumer) { + std::cerr << "Failed to create consumer: " << errStr << std::endl; + return -1; + } + + std::cout << "% Created consumer " << consumer->name() << std::endl; + + return 1; +} + +void QKafkaConsumer::run() +{ + RdKafka::Topic * topic = RdKafka::Topic::create(consumer, this->topic.toStdString(), tconf, errStr); + if (!topic) { + std::cerr << "Failed to create topic: " << errStr << std::endl; + } + + RdKafka::ErrorCode resp = consumer->start(topic, 0, RdKafka::Topic::OFFSET_END); + if (resp != RdKafka::ERR_NO_ERROR) { + std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << std::endl; + } + + while (runFlag) + { + RdKafka::Message * message = consumer->consume(topic, 0, 200); + messageConsume(message); + } +} + +void QKafkaConsumer::messageConsume(RdKafka::Message* message) { + const RdKafka::Headers *headers; + + switch (message->err()) { + case RdKafka::ERR__TIMED_OUT: + break; + + case RdKafka::ERR_NO_ERROR: + { + /* Real message */ +// std::cout << "Read msg at offset " << message->offset() << std::endl; +// printf("%.*s\n", static_cast(message->len()), static_cast(message->payload())); + + QString messageStr = static_cast(message->payload()); + emit messageRecieved(messageStr); + + if (message->key()) { + std::cout << "Key: " << *message->key() << std::endl; + } + headers = message->headers(); + if (headers) { + std::vector hdrs = headers->get_all(); + for (size_t i = 0 ; i < hdrs.size() ; i++) { + const RdKafka::Headers::Header hdr = hdrs[i]; + + if (hdr.value() != NULL) + printf(" Header: %s = \"%.*s\"\n", + hdr.key().c_str(), + (int)hdr.value_size(), (const char *)hdr.value()); + else + printf(" Header: %s = NULL\n", hdr.key().c_str()); + } + } + break; + } + + case RdKafka::ERR__PARTITION_EOF: + /* Last message */ + if (exit_eof) { + runFlag = 0; + } + break; + + case RdKafka::ERR__UNKNOWN_TOPIC: + case RdKafka::ERR__UNKNOWN_PARTITION: + std::cerr << "Consume failed: " << message->errstr() << std::endl; + runFlag = 0; + break; + + default: + /* Errors */ + std::cerr << "Consume failed: " << message->errstr() << std::endl; + runFlag = 0; + } +} diff --git a/DevStatusAcq/common/utils/QKafkaConsumer.h b/DevStatusAcq/common/utils/QKafkaConsumer.h new file mode 100644 index 0000000..f278676 --- /dev/null +++ b/DevStatusAcq/common/utils/QKafkaConsumer.h @@ -0,0 +1,38 @@ +#ifndef QKAFKACONSUMER_H +#define QKAFKACONSUMER_H + +#include + +#include "include/librdkafka/rdkafkacpp.h" + +class QKafkaConsumer : public QThread +{ + Q_OBJECT +public: + explicit QKafkaConsumer(QObject *parent = nullptr); + + void setBrokers(QString brokers); + void setTopic(QString topic); + + int createConsumer(); + void run(); + + void messageConsume(RdKafka::Message * message); + +private: + QString brokers; + QString topic; + + std::string errStr; + + RdKafka::Conf * conf; + RdKafka::Conf * tconf; + + RdKafka::Consumer * consumer = 0; + +signals: + void messageRecieved(QString message); + +}; + +#endif // QKAFKACONSUMER_H diff --git a/DevStatusAcq/common/utils/QKafkaUtil.cpp b/DevStatusAcq/common/utils/QKafkaUtil.cpp index 3f00452..1d7310f 100644 --- a/DevStatusAcq/common/utils/QKafkaUtil.cpp +++ b/DevStatusAcq/common/utils/QKafkaUtil.cpp @@ -41,6 +41,25 @@ int QKafkaUtil::produceMessage(QString message) { + auto retCode = producer->produce(this->topic.toStdString(), RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, + const_cast(message.toStdString().c_str()), message.size(), + nullptr, 0, 0, nullptr, nullptr); + + if (retCode != RdKafka::ERR_NO_ERROR) + { + std::cerr << "Failed to produce to topic " << topic.toStdString() << ": " << + RdKafka::err2str(retCode) << std::endl; + } else + { + std::cerr << "Enqueued message (" << message.size() << " bytes) " << + "for topic " << topic.toStdString() << "[" << message.toStdString() <<"]" << std::endl; + } + + return retCode; +} + +int QKafkaUtil::produceMessage(QString topic, QString message) +{ auto retCode = producer->produce(topic.toStdString(), RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, const_cast(message.toStdString().c_str()), message.size(), nullptr, 0, 0, nullptr, nullptr); diff --git a/DevStatusAcq/DevStatusWindow.cpp b/DevStatusAcq/DevStatusWindow.cpp index 5e11c14..3d3b427 100644 --- a/DevStatusAcq/DevStatusWindow.cpp +++ b/DevStatusAcq/DevStatusWindow.cpp @@ -51,11 +51,25 @@ connect(freqTunDevice, &FrequencyTuning::sendDataToDraw, this, &DevStatusWindow::drawFrameDataOnPage); + + this->kafkaConsumer = new QKafkaConsumer(this); + + connect(kafkaConsumer, &QKafkaConsumer::messageRecieved, + this, &DevStatusWindow::devSettingCommandRecieved); + + kafkaConsumer->setBrokers("111.198.10.15:12502"); + kafkaConsumer->setTopic("cppTest"); + kafkaConsumer->createConsumer(); + kafkaConsumer->start(); + } DevStatusWindow::~DevStatusWindow() { + kafkaConsumer->deleteLater(); + delete ui; + delete kafkaConsumer; } void DevStatusWindow::drawFrameDataOnPage(DeviceFrameBaseDto * frameData) @@ -80,6 +94,67 @@ } } +void DevStatusWindow::devSettingCommandRecieved(QString commandJsonStr) +{ + std::cout << commandJsonStr.toStdString() << std::endl; + QJsonParseError jsonErr; + QJsonDocument doc = QJsonDocument::fromJson(commandJsonStr.toUtf8(), &jsonErr); + if (jsonErr.error == QJsonParseError::NoError) + { + QJsonObject commandJson = doc.object(); + QString deviceId; + QString deviceType; + QString command; + QString params; + if (commandJson.contains("deviceType") == true) + { + deviceType = commandJson.value("deviceType").toString(); + if (deviceType.isEmpty() == false) + { + for (int i = 0; i < ui->devTypeSelect->count(); i++) + { + if (ui->devTypeSelect->itemData(i).toString() == deviceType) + { + ui->devTypeSelect->setCurrentIndex(i); + } + } + } + } + + if (commandJson.contains("deviceId") == true) + { + deviceId = commandJson.value("deviceId").toString(); + if (deviceId.isEmpty() == false) + { + for (int i = 0; i < ui->devSelect->count(); i++) + { + if (ui->devSelect->itemData(i).toJsonObject().find("deviceId")->toString() == deviceId) + { + ui->devSelect->setCurrentIndex(i); + } + } + } + } + + if (commandJson.contains("command") == true) + { + command = commandJson.value("command").toString(); + } + if (commandJson.contains("params") == true) + { + params = commandJson.value("params").toString(); + } + + std::cout << deviceId.toStdString() << " " << deviceType.toStdString() << " " << command.toStdString() << " " << params.toStdString() << std::endl; + + if (deviceType == "03") + { + QByteArray commandBytes = freqTunDevice->protocol->generateSettingCommand(command, params); + freqTunDevice->sendDataToSerial(commandBytes); + } + } +} + int DevStatusWindow::initHttpToken() { QJsonObject response = httpReq->getTokenByClientId(SettingConfig::getInstance().CLIENT_ID, diff --git a/DevStatusAcq/DevStatusWindow.h b/DevStatusAcq/DevStatusWindow.h index d27b5c6..71668ae 100644 --- a/DevStatusAcq/DevStatusWindow.h +++ b/DevStatusAcq/DevStatusWindow.h @@ -11,6 +11,7 @@ #include "device/FreqReplicator.h" #include "device/BCodeTerminal.h" #include "common/HttpRequestController.h" +#include "common/utils/QKafkaConsumer.h" QT_BEGIN_NAMESPACE namespace Ui { class DevStatusWindow; } @@ -27,25 +28,20 @@ public slots: void drawFrameDataOnPage(DeviceFrameBaseDto * frameData); + void devSettingCommandRecieved(QString commandJsonStr); + private slots: void on_devTypeSelect_currentIndexChanged(int index); void on_sigGenButt_clicked(); - void on_freqTunButt_clicked(); - void on_tmSwiButt_clicked(); - void on_freqSwiButt_clicked(); - void on_tmRepButt_clicked(); - void on_freqRepButt_clicked(); - void on_bctButt_clicked(); void on_exitButt_clicked(); - void on_minButt_clicked(); void on_ftSetFreqTurnButt_clicked(); @@ -55,27 +51,16 @@ void on_ftSetPulseWidthButt_clicked(); void on_sgLeapSecondSetButt_clicked(); - void on_sgSingleSynchSetButt_clicked(); - void on_sgDateSetButt_clicked(); - void on_sgSecondWidthSetButt_clicked(); - void on_sgBacRatioSetButt_clicked(); - void on_sgBacRangeSetButt_clicked(); - void on_sgOppsPhaseShiftSetButt_clicked(); - void on_sgLeapTimestampSetButt_clicked(); - void on_sgMJDDateSetButt_clicked(); - void on_sgTimeSetButt_clicked(); - void on_sgKeyControlSetButt_clicked(); - void on_sgTimeTypeSetButt_clicked(); private: @@ -94,5 +79,7 @@ TimeReplicator * timeRepDevice; FreqReplicator * freqRepDevice; BCodeTerminal * bCodeTermDevice; + + QKafkaConsumer * kafkaConsumer; }; #endif // DEVSTATUSWINDOW_H diff --git a/DevStatusAcq/common/common.pri b/DevStatusAcq/common/common.pri index 6280f64..18b9b5d 100644 --- a/DevStatusAcq/common/common.pri +++ b/DevStatusAcq/common/common.pri @@ -1,5 +1,6 @@ -SOURCES += $$PWD/utils/SettingConfig.cpp +SOURCES += $$PWD/utils/SettingConfig.cpp \ + $$PWD/utils/QKafkaConsumer.cpp SOURCES += $$PWD/utils/QByteUtil.cpp SOURCES += $$PWD/utils/QSerialPortUtil.cpp SOURCES += $$PWD/utils/QLogUtil.cpp @@ -8,7 +9,8 @@ SOURCES += $$PWD/utils/MD5.cpp SOURCES += $$PWD/HttpRequestController.cpp -HEADERS += $$PWD/utils/SettingConfig.h +HEADERS += $$PWD/utils/SettingConfig.h \ + $$PWD/utils/QKafkaConsumer.h HEADERS += $$PWD/utils/QByteUtil.h HEADERS += $$PWD/utils/QSerialPortUtil.h HEADERS += $$PWD/utils/QLogUtil.h diff --git a/DevStatusAcq/common/utils/QKafkaConsumer.cpp b/DevStatusAcq/common/utils/QKafkaConsumer.cpp new file mode 100644 index 0000000..2ce6d03 --- /dev/null +++ b/DevStatusAcq/common/utils/QKafkaConsumer.cpp @@ -0,0 +1,116 @@ +#include "QKafkaConsumer.h" +#include + +static volatile int runFlag = 1; +static bool exit_eof = false; + +QKafkaConsumer::QKafkaConsumer(QObject *parent) : QThread(parent) +{ + this->conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + this->tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); + + conf->set("enable.partition.eof", "true", errStr); +} + +void QKafkaConsumer::setBrokers(QString brokers) +{ + this->brokers = brokers; +} +void QKafkaConsumer::setTopic(QString topic) +{ + this->topic = topic; +} + +int QKafkaConsumer::createConsumer() +{ + int ret = conf->set("metadata.broker.list", brokers.toStdString(), errStr); + if (ret != RdKafka::Conf::CONF_OK) + { + std::cerr << "RdKafka conf set brokerlist failed :" << errStr.c_str() << std::endl; + } + + consumer = RdKafka::Consumer::create(conf, errStr); + if (!consumer) { + std::cerr << "Failed to create consumer: " << errStr << std::endl; + return -1; + } + + std::cout << "% Created consumer " << consumer->name() << std::endl; + + return 1; +} + +void QKafkaConsumer::run() +{ + RdKafka::Topic * topic = RdKafka::Topic::create(consumer, this->topic.toStdString(), tconf, errStr); + if (!topic) { + std::cerr << "Failed to create topic: " << errStr << std::endl; + } + + RdKafka::ErrorCode resp = consumer->start(topic, 0, RdKafka::Topic::OFFSET_END); + if (resp != RdKafka::ERR_NO_ERROR) { + std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << std::endl; + } + + while (runFlag) + { + RdKafka::Message * message = consumer->consume(topic, 0, 200); + messageConsume(message); + } +} + +void QKafkaConsumer::messageConsume(RdKafka::Message* message) { + const RdKafka::Headers *headers; + + switch (message->err()) { + case RdKafka::ERR__TIMED_OUT: + break; + + case RdKafka::ERR_NO_ERROR: + { + /* Real message */ +// std::cout << "Read msg at offset " << message->offset() << std::endl; +// printf("%.*s\n", static_cast(message->len()), static_cast(message->payload())); + + QString messageStr = static_cast(message->payload()); + emit messageRecieved(messageStr); + + if (message->key()) { + std::cout << "Key: " << *message->key() << std::endl; + } + headers = message->headers(); + if (headers) { + std::vector hdrs = headers->get_all(); + for (size_t i = 0 ; i < hdrs.size() ; i++) { + const RdKafka::Headers::Header hdr = hdrs[i]; + + if (hdr.value() != NULL) + printf(" Header: %s = \"%.*s\"\n", + hdr.key().c_str(), + (int)hdr.value_size(), (const char *)hdr.value()); + else + printf(" Header: %s = NULL\n", hdr.key().c_str()); + } + } + break; + } + + case RdKafka::ERR__PARTITION_EOF: + /* Last message */ + if (exit_eof) { + runFlag = 0; + } + break; + + case RdKafka::ERR__UNKNOWN_TOPIC: + case RdKafka::ERR__UNKNOWN_PARTITION: + std::cerr << "Consume failed: " << message->errstr() << std::endl; + runFlag = 0; + break; + + default: + /* Errors */ + std::cerr << "Consume failed: " << message->errstr() << std::endl; + runFlag = 0; + } +} diff --git a/DevStatusAcq/common/utils/QKafkaConsumer.h b/DevStatusAcq/common/utils/QKafkaConsumer.h new file mode 100644 index 0000000..f278676 --- /dev/null +++ b/DevStatusAcq/common/utils/QKafkaConsumer.h @@ -0,0 +1,38 @@ +#ifndef QKAFKACONSUMER_H +#define QKAFKACONSUMER_H + +#include + +#include "include/librdkafka/rdkafkacpp.h" + +class QKafkaConsumer : public QThread +{ + Q_OBJECT +public: + explicit QKafkaConsumer(QObject *parent = nullptr); + + void setBrokers(QString brokers); + void setTopic(QString topic); + + int createConsumer(); + void run(); + + void messageConsume(RdKafka::Message * message); + +private: + QString brokers; + QString topic; + + std::string errStr; + + RdKafka::Conf * conf; + RdKafka::Conf * tconf; + + RdKafka::Consumer * consumer = 0; + +signals: + void messageRecieved(QString message); + +}; + +#endif // QKAFKACONSUMER_H diff --git a/DevStatusAcq/common/utils/QKafkaUtil.cpp b/DevStatusAcq/common/utils/QKafkaUtil.cpp index 3f00452..1d7310f 100644 --- a/DevStatusAcq/common/utils/QKafkaUtil.cpp +++ b/DevStatusAcq/common/utils/QKafkaUtil.cpp @@ -41,6 +41,25 @@ int QKafkaUtil::produceMessage(QString message) { + auto retCode = producer->produce(this->topic.toStdString(), RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, + const_cast(message.toStdString().c_str()), message.size(), + nullptr, 0, 0, nullptr, nullptr); + + if (retCode != RdKafka::ERR_NO_ERROR) + { + std::cerr << "Failed to produce to topic " << topic.toStdString() << ": " << + RdKafka::err2str(retCode) << std::endl; + } else + { + std::cerr << "Enqueued message (" << message.size() << " bytes) " << + "for topic " << topic.toStdString() << "[" << message.toStdString() <<"]" << std::endl; + } + + return retCode; +} + +int QKafkaUtil::produceMessage(QString topic, QString message) +{ auto retCode = producer->produce(topic.toStdString(), RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, const_cast(message.toStdString().c_str()), message.size(), nullptr, 0, 0, nullptr, nullptr); diff --git a/DevStatusAcq/common/utils/QKafkaUtil.h b/DevStatusAcq/common/utils/QKafkaUtil.h index 289ea6f..de63820 100644 --- a/DevStatusAcq/common/utils/QKafkaUtil.h +++ b/DevStatusAcq/common/utils/QKafkaUtil.h @@ -16,6 +16,7 @@ int createProducer(); int produceMessage(QString message); + int produceMessage(QString topic, QString message); private: QString brokers; diff --git a/DevStatusAcq/DevStatusWindow.cpp b/DevStatusAcq/DevStatusWindow.cpp index 5e11c14..3d3b427 100644 --- a/DevStatusAcq/DevStatusWindow.cpp +++ b/DevStatusAcq/DevStatusWindow.cpp @@ -51,11 +51,25 @@ connect(freqTunDevice, &FrequencyTuning::sendDataToDraw, this, &DevStatusWindow::drawFrameDataOnPage); + + this->kafkaConsumer = new QKafkaConsumer(this); + + connect(kafkaConsumer, &QKafkaConsumer::messageRecieved, + this, &DevStatusWindow::devSettingCommandRecieved); + + kafkaConsumer->setBrokers("111.198.10.15:12502"); + kafkaConsumer->setTopic("cppTest"); + kafkaConsumer->createConsumer(); + kafkaConsumer->start(); + } DevStatusWindow::~DevStatusWindow() { + kafkaConsumer->deleteLater(); + delete ui; + delete kafkaConsumer; } void DevStatusWindow::drawFrameDataOnPage(DeviceFrameBaseDto * frameData) @@ -80,6 +94,67 @@ } } +void DevStatusWindow::devSettingCommandRecieved(QString commandJsonStr) +{ + std::cout << commandJsonStr.toStdString() << std::endl; + QJsonParseError jsonErr; + QJsonDocument doc = QJsonDocument::fromJson(commandJsonStr.toUtf8(), &jsonErr); + if (jsonErr.error == QJsonParseError::NoError) + { + QJsonObject commandJson = doc.object(); + QString deviceId; + QString deviceType; + QString command; + QString params; + if (commandJson.contains("deviceType") == true) + { + deviceType = commandJson.value("deviceType").toString(); + if (deviceType.isEmpty() == false) + { + for (int i = 0; i < ui->devTypeSelect->count(); i++) + { + if (ui->devTypeSelect->itemData(i).toString() == deviceType) + { + ui->devTypeSelect->setCurrentIndex(i); + } + } + } + } + + if (commandJson.contains("deviceId") == true) + { + deviceId = commandJson.value("deviceId").toString(); + if (deviceId.isEmpty() == false) + { + for (int i = 0; i < ui->devSelect->count(); i++) + { + if (ui->devSelect->itemData(i).toJsonObject().find("deviceId")->toString() == deviceId) + { + ui->devSelect->setCurrentIndex(i); + } + } + } + } + + if (commandJson.contains("command") == true) + { + command = commandJson.value("command").toString(); + } + if (commandJson.contains("params") == true) + { + params = commandJson.value("params").toString(); + } + + std::cout << deviceId.toStdString() << " " << deviceType.toStdString() << " " << command.toStdString() << " " << params.toStdString() << std::endl; + + if (deviceType == "03") + { + QByteArray commandBytes = freqTunDevice->protocol->generateSettingCommand(command, params); + freqTunDevice->sendDataToSerial(commandBytes); + } + } +} + int DevStatusWindow::initHttpToken() { QJsonObject response = httpReq->getTokenByClientId(SettingConfig::getInstance().CLIENT_ID, diff --git a/DevStatusAcq/DevStatusWindow.h b/DevStatusAcq/DevStatusWindow.h index d27b5c6..71668ae 100644 --- a/DevStatusAcq/DevStatusWindow.h +++ b/DevStatusAcq/DevStatusWindow.h @@ -11,6 +11,7 @@ #include "device/FreqReplicator.h" #include "device/BCodeTerminal.h" #include "common/HttpRequestController.h" +#include "common/utils/QKafkaConsumer.h" QT_BEGIN_NAMESPACE namespace Ui { class DevStatusWindow; } @@ -27,25 +28,20 @@ public slots: void drawFrameDataOnPage(DeviceFrameBaseDto * frameData); + void devSettingCommandRecieved(QString commandJsonStr); + private slots: void on_devTypeSelect_currentIndexChanged(int index); void on_sigGenButt_clicked(); - void on_freqTunButt_clicked(); - void on_tmSwiButt_clicked(); - void on_freqSwiButt_clicked(); - void on_tmRepButt_clicked(); - void on_freqRepButt_clicked(); - void on_bctButt_clicked(); void on_exitButt_clicked(); - void on_minButt_clicked(); void on_ftSetFreqTurnButt_clicked(); @@ -55,27 +51,16 @@ void on_ftSetPulseWidthButt_clicked(); void on_sgLeapSecondSetButt_clicked(); - void on_sgSingleSynchSetButt_clicked(); - void on_sgDateSetButt_clicked(); - void on_sgSecondWidthSetButt_clicked(); - void on_sgBacRatioSetButt_clicked(); - void on_sgBacRangeSetButt_clicked(); - void on_sgOppsPhaseShiftSetButt_clicked(); - void on_sgLeapTimestampSetButt_clicked(); - void on_sgMJDDateSetButt_clicked(); - void on_sgTimeSetButt_clicked(); - void on_sgKeyControlSetButt_clicked(); - void on_sgTimeTypeSetButt_clicked(); private: @@ -94,5 +79,7 @@ TimeReplicator * timeRepDevice; FreqReplicator * freqRepDevice; BCodeTerminal * bCodeTermDevice; + + QKafkaConsumer * kafkaConsumer; }; #endif // DEVSTATUSWINDOW_H diff --git a/DevStatusAcq/common/common.pri b/DevStatusAcq/common/common.pri index 6280f64..18b9b5d 100644 --- a/DevStatusAcq/common/common.pri +++ b/DevStatusAcq/common/common.pri @@ -1,5 +1,6 @@ -SOURCES += $$PWD/utils/SettingConfig.cpp +SOURCES += $$PWD/utils/SettingConfig.cpp \ + $$PWD/utils/QKafkaConsumer.cpp SOURCES += $$PWD/utils/QByteUtil.cpp SOURCES += $$PWD/utils/QSerialPortUtil.cpp SOURCES += $$PWD/utils/QLogUtil.cpp @@ -8,7 +9,8 @@ SOURCES += $$PWD/utils/MD5.cpp SOURCES += $$PWD/HttpRequestController.cpp -HEADERS += $$PWD/utils/SettingConfig.h +HEADERS += $$PWD/utils/SettingConfig.h \ + $$PWD/utils/QKafkaConsumer.h HEADERS += $$PWD/utils/QByteUtil.h HEADERS += $$PWD/utils/QSerialPortUtil.h HEADERS += $$PWD/utils/QLogUtil.h diff --git a/DevStatusAcq/common/utils/QKafkaConsumer.cpp b/DevStatusAcq/common/utils/QKafkaConsumer.cpp new file mode 100644 index 0000000..2ce6d03 --- /dev/null +++ b/DevStatusAcq/common/utils/QKafkaConsumer.cpp @@ -0,0 +1,116 @@ +#include "QKafkaConsumer.h" +#include + +static volatile int runFlag = 1; +static bool exit_eof = false; + +QKafkaConsumer::QKafkaConsumer(QObject *parent) : QThread(parent) +{ + this->conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + this->tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); + + conf->set("enable.partition.eof", "true", errStr); +} + +void QKafkaConsumer::setBrokers(QString brokers) +{ + this->brokers = brokers; +} +void QKafkaConsumer::setTopic(QString topic) +{ + this->topic = topic; +} + +int QKafkaConsumer::createConsumer() +{ + int ret = conf->set("metadata.broker.list", brokers.toStdString(), errStr); + if (ret != RdKafka::Conf::CONF_OK) + { + std::cerr << "RdKafka conf set brokerlist failed :" << errStr.c_str() << std::endl; + } + + consumer = RdKafka::Consumer::create(conf, errStr); + if (!consumer) { + std::cerr << "Failed to create consumer: " << errStr << std::endl; + return -1; + } + + std::cout << "% Created consumer " << consumer->name() << std::endl; + + return 1; +} + +void QKafkaConsumer::run() +{ + RdKafka::Topic * topic = RdKafka::Topic::create(consumer, this->topic.toStdString(), tconf, errStr); + if (!topic) { + std::cerr << "Failed to create topic: " << errStr << std::endl; + } + + RdKafka::ErrorCode resp = consumer->start(topic, 0, RdKafka::Topic::OFFSET_END); + if (resp != RdKafka::ERR_NO_ERROR) { + std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << std::endl; + } + + while (runFlag) + { + RdKafka::Message * message = consumer->consume(topic, 0, 200); + messageConsume(message); + } +} + +void QKafkaConsumer::messageConsume(RdKafka::Message* message) { + const RdKafka::Headers *headers; + + switch (message->err()) { + case RdKafka::ERR__TIMED_OUT: + break; + + case RdKafka::ERR_NO_ERROR: + { + /* Real message */ +// std::cout << "Read msg at offset " << message->offset() << std::endl; +// printf("%.*s\n", static_cast(message->len()), static_cast(message->payload())); + + QString messageStr = static_cast(message->payload()); + emit messageRecieved(messageStr); + + if (message->key()) { + std::cout << "Key: " << *message->key() << std::endl; + } + headers = message->headers(); + if (headers) { + std::vector hdrs = headers->get_all(); + for (size_t i = 0 ; i < hdrs.size() ; i++) { + const RdKafka::Headers::Header hdr = hdrs[i]; + + if (hdr.value() != NULL) + printf(" Header: %s = \"%.*s\"\n", + hdr.key().c_str(), + (int)hdr.value_size(), (const char *)hdr.value()); + else + printf(" Header: %s = NULL\n", hdr.key().c_str()); + } + } + break; + } + + case RdKafka::ERR__PARTITION_EOF: + /* Last message */ + if (exit_eof) { + runFlag = 0; + } + break; + + case RdKafka::ERR__UNKNOWN_TOPIC: + case RdKafka::ERR__UNKNOWN_PARTITION: + std::cerr << "Consume failed: " << message->errstr() << std::endl; + runFlag = 0; + break; + + default: + /* Errors */ + std::cerr << "Consume failed: " << message->errstr() << std::endl; + runFlag = 0; + } +} diff --git a/DevStatusAcq/common/utils/QKafkaConsumer.h b/DevStatusAcq/common/utils/QKafkaConsumer.h new file mode 100644 index 0000000..f278676 --- /dev/null +++ b/DevStatusAcq/common/utils/QKafkaConsumer.h @@ -0,0 +1,38 @@ +#ifndef QKAFKACONSUMER_H +#define QKAFKACONSUMER_H + +#include + +#include "include/librdkafka/rdkafkacpp.h" + +class QKafkaConsumer : public QThread +{ + Q_OBJECT +public: + explicit QKafkaConsumer(QObject *parent = nullptr); + + void setBrokers(QString brokers); + void setTopic(QString topic); + + int createConsumer(); + void run(); + + void messageConsume(RdKafka::Message * message); + +private: + QString brokers; + QString topic; + + std::string errStr; + + RdKafka::Conf * conf; + RdKafka::Conf * tconf; + + RdKafka::Consumer * consumer = 0; + +signals: + void messageRecieved(QString message); + +}; + +#endif // QKAFKACONSUMER_H diff --git a/DevStatusAcq/common/utils/QKafkaUtil.cpp b/DevStatusAcq/common/utils/QKafkaUtil.cpp index 3f00452..1d7310f 100644 --- a/DevStatusAcq/common/utils/QKafkaUtil.cpp +++ b/DevStatusAcq/common/utils/QKafkaUtil.cpp @@ -41,6 +41,25 @@ int QKafkaUtil::produceMessage(QString message) { + auto retCode = producer->produce(this->topic.toStdString(), RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, + const_cast(message.toStdString().c_str()), message.size(), + nullptr, 0, 0, nullptr, nullptr); + + if (retCode != RdKafka::ERR_NO_ERROR) + { + std::cerr << "Failed to produce to topic " << topic.toStdString() << ": " << + RdKafka::err2str(retCode) << std::endl; + } else + { + std::cerr << "Enqueued message (" << message.size() << " bytes) " << + "for topic " << topic.toStdString() << "[" << message.toStdString() <<"]" << std::endl; + } + + return retCode; +} + +int QKafkaUtil::produceMessage(QString topic, QString message) +{ auto retCode = producer->produce(topic.toStdString(), RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, const_cast(message.toStdString().c_str()), message.size(), nullptr, 0, 0, nullptr, nullptr); diff --git a/DevStatusAcq/common/utils/QKafkaUtil.h b/DevStatusAcq/common/utils/QKafkaUtil.h index 289ea6f..de63820 100644 --- a/DevStatusAcq/common/utils/QKafkaUtil.h +++ b/DevStatusAcq/common/utils/QKafkaUtil.h @@ -16,6 +16,7 @@ int createProducer(); int produceMessage(QString message); + int produceMessage(QString topic, QString message); private: QString brokers; diff --git a/DevStatusAcq/common/utils/QSerialPortUtil.cpp b/DevStatusAcq/common/utils/QSerialPortUtil.cpp index df48453..0138158 100644 --- a/DevStatusAcq/common/utils/QSerialPortUtil.cpp +++ b/DevStatusAcq/common/utils/QSerialPortUtil.cpp @@ -12,6 +12,10 @@ serial.setParity(QSerialPort::NoParity); serial.setStopBits(QSerialPort::OneStop); serial.setFlowControl(QSerialPort::NoFlowControl); + + // 绑定信号与槽 + connect(&serial, &QSerialPort::readyRead, + this, &QSerialPortUtil::readData); } void QSerialPortUtil::openSerialPort(QString portName, int baudRate) @@ -23,10 +27,6 @@ // if (open == true) // { - // 绑定信号与槽 - connect(&serial, &QSerialPort::readyRead, - this, &QSerialPortUtil::readData); - // mock data received per second // QTimer * timer = new QTimer(this); // connect(timer, &QTimer::timeout, @@ -62,6 +62,7 @@ void QSerialPortUtil::mockReceivData(QString portName) { QByteArray buffer; + buffer.clear(); if (portName == "SignalGenerator") {