diff --git a/PhaseCompAcq/PhaseCompAcq.pro b/PhaseCompAcq/PhaseCompAcq.pro index 7da4dd1..5eef2d1 100644 --- a/PhaseCompAcq/PhaseCompAcq.pro +++ b/PhaseCompAcq/PhaseCompAcq.pro @@ -29,10 +29,9 @@ !isEmpty(target.path): INSTALLS += target -win32: LIBS += -L$$PWD/librdkafka/ -lrdkafka -lrdkafka++ +INCLUDEPATH += $$PWD/include/librdkafka +DEPENDPATH += $$PWD/include/librdkafka -INCLUDEPATH += $$PWD/librdkafka/include -DEPENDPATH += $$PWD/librdkafka/include +DISTFILES += conf/config.ini -DISTFILES += \ - conf/config.ini +unix:!macx: LIBS += -L$$PWD/lib/librdkafka/ -lrdkafka -lrdkafka++ diff --git a/PhaseCompAcq/PhaseCompAcq.pro b/PhaseCompAcq/PhaseCompAcq.pro index 7da4dd1..5eef2d1 100644 --- a/PhaseCompAcq/PhaseCompAcq.pro +++ b/PhaseCompAcq/PhaseCompAcq.pro @@ -29,10 +29,9 @@ !isEmpty(target.path): INSTALLS += target -win32: LIBS += -L$$PWD/librdkafka/ -lrdkafka -lrdkafka++ +INCLUDEPATH += $$PWD/include/librdkafka +DEPENDPATH += $$PWD/include/librdkafka -INCLUDEPATH += $$PWD/librdkafka/include -DEPENDPATH += $$PWD/librdkafka/include +DISTFILES += conf/config.ini -DISTFILES += \ - conf/config.ini +unix:!macx: LIBS += -L$$PWD/lib/librdkafka/ -lrdkafka -lrdkafka++ diff --git a/PhaseCompAcq/PhaseWindow.cpp b/PhaseCompAcq/PhaseWindow.cpp index 8999d5c..459dcaf 100644 --- a/PhaseCompAcq/PhaseWindow.cpp +++ b/PhaseCompAcq/PhaseWindow.cpp @@ -61,6 +61,8 @@ // 创建设备的widget this->generateWidgetForDevice(devCodeList.at(i), i); } + + this->createKafkaMessage(); } PhaseWindow::~PhaseWindow() @@ -157,3 +159,12 @@ this->tableModelList.append(channelModelList); } +void PhaseWindow::createKafkaMessage() +{ + this->kafkaUtil = new QKafkaUtil(this); + this->kafkaUtil->setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + this->kafkaUtil->setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + + kafkaUtil->createProducer(); + kafkaUtil->produceMessage("hello world kafka kylin"); +} diff --git a/PhaseCompAcq/PhaseCompAcq.pro b/PhaseCompAcq/PhaseCompAcq.pro index 7da4dd1..5eef2d1 100644 --- a/PhaseCompAcq/PhaseCompAcq.pro +++ b/PhaseCompAcq/PhaseCompAcq.pro @@ -29,10 +29,9 @@ !isEmpty(target.path): INSTALLS += target -win32: LIBS += -L$$PWD/librdkafka/ -lrdkafka -lrdkafka++ +INCLUDEPATH += $$PWD/include/librdkafka +DEPENDPATH += $$PWD/include/librdkafka -INCLUDEPATH += $$PWD/librdkafka/include -DEPENDPATH += $$PWD/librdkafka/include +DISTFILES += conf/config.ini -DISTFILES += \ - conf/config.ini +unix:!macx: LIBS += -L$$PWD/lib/librdkafka/ -lrdkafka -lrdkafka++ diff --git a/PhaseCompAcq/PhaseWindow.cpp b/PhaseCompAcq/PhaseWindow.cpp index 8999d5c..459dcaf 100644 --- a/PhaseCompAcq/PhaseWindow.cpp +++ b/PhaseCompAcq/PhaseWindow.cpp @@ -61,6 +61,8 @@ // 创建设备的widget this->generateWidgetForDevice(devCodeList.at(i), i); } + + this->createKafkaMessage(); } PhaseWindow::~PhaseWindow() @@ -157,3 +159,12 @@ this->tableModelList.append(channelModelList); } +void PhaseWindow::createKafkaMessage() +{ + this->kafkaUtil = new QKafkaUtil(this); + this->kafkaUtil->setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + this->kafkaUtil->setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + + kafkaUtil->createProducer(); + kafkaUtil->produceMessage("hello world kafka kylin"); +} diff --git a/PhaseCompAcq/PhaseWindow.h b/PhaseCompAcq/PhaseWindow.h index abd1823..67b8363 100644 --- a/PhaseCompAcq/PhaseWindow.h +++ b/PhaseCompAcq/PhaseWindow.h @@ -6,6 +6,7 @@ #include "PhaseDevice.h" #include "common/utils/SettingConfig.h" +#include "common/utils/QKafkaUtil.h" namespace Ui { class PhaseWindow; @@ -30,7 +31,10 @@ QList deviceList; QList> tableModelList; + QKafkaUtil * kafkaUtil; + void generateWidgetForDevice(QString devCode, int index); + void createKafkaMessage(); }; #endif // PHASEWINDOW_H diff --git a/PhaseCompAcq/PhaseCompAcq.pro b/PhaseCompAcq/PhaseCompAcq.pro index 7da4dd1..5eef2d1 100644 --- a/PhaseCompAcq/PhaseCompAcq.pro +++ b/PhaseCompAcq/PhaseCompAcq.pro @@ -29,10 +29,9 @@ !isEmpty(target.path): INSTALLS += target -win32: LIBS += -L$$PWD/librdkafka/ -lrdkafka -lrdkafka++ +INCLUDEPATH += $$PWD/include/librdkafka +DEPENDPATH += $$PWD/include/librdkafka -INCLUDEPATH += $$PWD/librdkafka/include -DEPENDPATH += $$PWD/librdkafka/include +DISTFILES += conf/config.ini -DISTFILES += \ - conf/config.ini +unix:!macx: LIBS += -L$$PWD/lib/librdkafka/ -lrdkafka -lrdkafka++ diff --git a/PhaseCompAcq/PhaseWindow.cpp b/PhaseCompAcq/PhaseWindow.cpp index 8999d5c..459dcaf 100644 --- a/PhaseCompAcq/PhaseWindow.cpp +++ b/PhaseCompAcq/PhaseWindow.cpp @@ -61,6 +61,8 @@ // 创建设备的widget this->generateWidgetForDevice(devCodeList.at(i), i); } + + this->createKafkaMessage(); } PhaseWindow::~PhaseWindow() @@ -157,3 +159,12 @@ this->tableModelList.append(channelModelList); } +void PhaseWindow::createKafkaMessage() +{ + this->kafkaUtil = new QKafkaUtil(this); + this->kafkaUtil->setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + this->kafkaUtil->setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + + kafkaUtil->createProducer(); + kafkaUtil->produceMessage("hello world kafka kylin"); +} diff --git a/PhaseCompAcq/PhaseWindow.h b/PhaseCompAcq/PhaseWindow.h index abd1823..67b8363 100644 --- a/PhaseCompAcq/PhaseWindow.h +++ b/PhaseCompAcq/PhaseWindow.h @@ -6,6 +6,7 @@ #include "PhaseDevice.h" #include "common/utils/SettingConfig.h" +#include "common/utils/QKafkaUtil.h" namespace Ui { class PhaseWindow; @@ -30,7 +31,10 @@ QList deviceList; QList> tableModelList; + QKafkaUtil * kafkaUtil; + void generateWidgetForDevice(QString devCode, int index); + void createKafkaMessage(); }; #endif // PHASEWINDOW_H diff --git a/PhaseCompAcq/common/utils/QKafkaUtil.cpp b/PhaseCompAcq/common/utils/QKafkaUtil.cpp index 6ecd6f0..3f00452 100644 --- a/PhaseCompAcq/common/utils/QKafkaUtil.cpp +++ b/PhaseCompAcq/common/utils/QKafkaUtil.cpp @@ -16,7 +16,7 @@ } -int QKafkaUtil::initKafkaConf() +int QKafkaUtil::createProducer() { int result; result = this->conf->set("bootstrap.servers", this->brokers.toStdString(), errStr); @@ -38,3 +38,22 @@ result = 1; return result; } + +int QKafkaUtil::produceMessage(QString message) +{ + auto retCode = producer->produce(topic.toStdString(), RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, + const_cast(message.toStdString().c_str()), message.size(), + nullptr, 0, 0, nullptr, nullptr); + + if (retCode != RdKafka::ERR_NO_ERROR) + { + std::cerr << "Failed to produce to topic " << topic.toStdString() << ": " << + RdKafka::err2str(retCode) << std::endl; + } else + { + std::cerr << "Enqueued message (" << message.size() << " bytes) " << + "for topic " << topic.toStdString() << "[" << message.toStdString() <<"]" << std::endl; + } + + return retCode; +} diff --git a/PhaseCompAcq/PhaseCompAcq.pro b/PhaseCompAcq/PhaseCompAcq.pro index 7da4dd1..5eef2d1 100644 --- a/PhaseCompAcq/PhaseCompAcq.pro +++ b/PhaseCompAcq/PhaseCompAcq.pro @@ -29,10 +29,9 @@ !isEmpty(target.path): INSTALLS += target -win32: LIBS += -L$$PWD/librdkafka/ -lrdkafka -lrdkafka++ +INCLUDEPATH += $$PWD/include/librdkafka +DEPENDPATH += $$PWD/include/librdkafka -INCLUDEPATH += $$PWD/librdkafka/include -DEPENDPATH += $$PWD/librdkafka/include +DISTFILES += conf/config.ini -DISTFILES += \ - conf/config.ini +unix:!macx: LIBS += -L$$PWD/lib/librdkafka/ -lrdkafka -lrdkafka++ diff --git a/PhaseCompAcq/PhaseWindow.cpp b/PhaseCompAcq/PhaseWindow.cpp index 8999d5c..459dcaf 100644 --- a/PhaseCompAcq/PhaseWindow.cpp +++ b/PhaseCompAcq/PhaseWindow.cpp @@ -61,6 +61,8 @@ // 创建设备的widget this->generateWidgetForDevice(devCodeList.at(i), i); } + + this->createKafkaMessage(); } PhaseWindow::~PhaseWindow() @@ -157,3 +159,12 @@ this->tableModelList.append(channelModelList); } +void PhaseWindow::createKafkaMessage() +{ + this->kafkaUtil = new QKafkaUtil(this); + this->kafkaUtil->setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + this->kafkaUtil->setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + + kafkaUtil->createProducer(); + kafkaUtil->produceMessage("hello world kafka kylin"); +} diff --git a/PhaseCompAcq/PhaseWindow.h b/PhaseCompAcq/PhaseWindow.h index abd1823..67b8363 100644 --- a/PhaseCompAcq/PhaseWindow.h +++ b/PhaseCompAcq/PhaseWindow.h @@ -6,6 +6,7 @@ #include "PhaseDevice.h" #include "common/utils/SettingConfig.h" +#include "common/utils/QKafkaUtil.h" namespace Ui { class PhaseWindow; @@ -30,7 +31,10 @@ QList deviceList; QList> tableModelList; + QKafkaUtil * kafkaUtil; + void generateWidgetForDevice(QString devCode, int index); + void createKafkaMessage(); }; #endif // PHASEWINDOW_H diff --git a/PhaseCompAcq/common/utils/QKafkaUtil.cpp b/PhaseCompAcq/common/utils/QKafkaUtil.cpp index 6ecd6f0..3f00452 100644 --- a/PhaseCompAcq/common/utils/QKafkaUtil.cpp +++ b/PhaseCompAcq/common/utils/QKafkaUtil.cpp @@ -16,7 +16,7 @@ } -int QKafkaUtil::initKafkaConf() +int QKafkaUtil::createProducer() { int result; result = this->conf->set("bootstrap.servers", this->brokers.toStdString(), errStr); @@ -38,3 +38,22 @@ result = 1; return result; } + +int QKafkaUtil::produceMessage(QString message) +{ + auto retCode = producer->produce(topic.toStdString(), RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, + const_cast(message.toStdString().c_str()), message.size(), + nullptr, 0, 0, nullptr, nullptr); + + if (retCode != RdKafka::ERR_NO_ERROR) + { + std::cerr << "Failed to produce to topic " << topic.toStdString() << ": " << + RdKafka::err2str(retCode) << std::endl; + } else + { + std::cerr << "Enqueued message (" << message.size() << " bytes) " << + "for topic " << topic.toStdString() << "[" << message.toStdString() <<"]" << std::endl; + } + + return retCode; +} diff --git a/PhaseCompAcq/common/utils/QKafkaUtil.h b/PhaseCompAcq/common/utils/QKafkaUtil.h index 106b289..289ea6f 100644 --- a/PhaseCompAcq/common/utils/QKafkaUtil.h +++ b/PhaseCompAcq/common/utils/QKafkaUtil.h @@ -3,8 +3,7 @@ #include -#include "librdkafka/include/rdkafkacpp.h" -#include "librdkafka/include/wingetopt.c" +#include "include/librdkafka/rdkafkacpp.h" class QKafkaUtil : public QObject { @@ -15,7 +14,8 @@ void setBrokers(QString brokers); void setTopic(QString topic); - int initKafkaConf(); + int createProducer(); + int produceMessage(QString message); private: QString brokers; diff --git a/PhaseCompAcq/PhaseCompAcq.pro b/PhaseCompAcq/PhaseCompAcq.pro index 7da4dd1..5eef2d1 100644 --- a/PhaseCompAcq/PhaseCompAcq.pro +++ b/PhaseCompAcq/PhaseCompAcq.pro @@ -29,10 +29,9 @@ !isEmpty(target.path): INSTALLS += target -win32: LIBS += -L$$PWD/librdkafka/ -lrdkafka -lrdkafka++ +INCLUDEPATH += $$PWD/include/librdkafka +DEPENDPATH += $$PWD/include/librdkafka -INCLUDEPATH += $$PWD/librdkafka/include -DEPENDPATH += $$PWD/librdkafka/include +DISTFILES += conf/config.ini -DISTFILES += \ - conf/config.ini +unix:!macx: LIBS += -L$$PWD/lib/librdkafka/ -lrdkafka -lrdkafka++ diff --git a/PhaseCompAcq/PhaseWindow.cpp b/PhaseCompAcq/PhaseWindow.cpp index 8999d5c..459dcaf 100644 --- a/PhaseCompAcq/PhaseWindow.cpp +++ b/PhaseCompAcq/PhaseWindow.cpp @@ -61,6 +61,8 @@ // 创建设备的widget this->generateWidgetForDevice(devCodeList.at(i), i); } + + this->createKafkaMessage(); } PhaseWindow::~PhaseWindow() @@ -157,3 +159,12 @@ this->tableModelList.append(channelModelList); } +void PhaseWindow::createKafkaMessage() +{ + this->kafkaUtil = new QKafkaUtil(this); + this->kafkaUtil->setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + this->kafkaUtil->setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + + kafkaUtil->createProducer(); + kafkaUtil->produceMessage("hello world kafka kylin"); +} diff --git a/PhaseCompAcq/PhaseWindow.h b/PhaseCompAcq/PhaseWindow.h index abd1823..67b8363 100644 --- a/PhaseCompAcq/PhaseWindow.h +++ b/PhaseCompAcq/PhaseWindow.h @@ -6,6 +6,7 @@ #include "PhaseDevice.h" #include "common/utils/SettingConfig.h" +#include "common/utils/QKafkaUtil.h" namespace Ui { class PhaseWindow; @@ -30,7 +31,10 @@ QList deviceList; QList> tableModelList; + QKafkaUtil * kafkaUtil; + void generateWidgetForDevice(QString devCode, int index); + void createKafkaMessage(); }; #endif // PHASEWINDOW_H diff --git a/PhaseCompAcq/common/utils/QKafkaUtil.cpp b/PhaseCompAcq/common/utils/QKafkaUtil.cpp index 6ecd6f0..3f00452 100644 --- a/PhaseCompAcq/common/utils/QKafkaUtil.cpp +++ b/PhaseCompAcq/common/utils/QKafkaUtil.cpp @@ -16,7 +16,7 @@ } -int QKafkaUtil::initKafkaConf() +int QKafkaUtil::createProducer() { int result; result = this->conf->set("bootstrap.servers", this->brokers.toStdString(), errStr); @@ -38,3 +38,22 @@ result = 1; return result; } + +int QKafkaUtil::produceMessage(QString message) +{ + auto retCode = producer->produce(topic.toStdString(), RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, + const_cast(message.toStdString().c_str()), message.size(), + nullptr, 0, 0, nullptr, nullptr); + + if (retCode != RdKafka::ERR_NO_ERROR) + { + std::cerr << "Failed to produce to topic " << topic.toStdString() << ": " << + RdKafka::err2str(retCode) << std::endl; + } else + { + std::cerr << "Enqueued message (" << message.size() << " bytes) " << + "for topic " << topic.toStdString() << "[" << message.toStdString() <<"]" << std::endl; + } + + return retCode; +} diff --git a/PhaseCompAcq/common/utils/QKafkaUtil.h b/PhaseCompAcq/common/utils/QKafkaUtil.h index 106b289..289ea6f 100644 --- a/PhaseCompAcq/common/utils/QKafkaUtil.h +++ b/PhaseCompAcq/common/utils/QKafkaUtil.h @@ -3,8 +3,7 @@ #include -#include "librdkafka/include/rdkafkacpp.h" -#include "librdkafka/include/wingetopt.c" +#include "include/librdkafka/rdkafkacpp.h" class QKafkaUtil : public QObject { @@ -15,7 +14,8 @@ void setBrokers(QString brokers); void setTopic(QString topic); - int initKafkaConf(); + int createProducer(); + int produceMessage(QString message); private: QString brokers; diff --git a/PhaseCompAcq/common/utils/SettingConfig.cpp b/PhaseCompAcq/common/utils/SettingConfig.cpp index ef037c9..c664a21 100644 --- a/PhaseCompAcq/common/utils/SettingConfig.cpp +++ b/PhaseCompAcq/common/utils/SettingConfig.cpp @@ -8,6 +8,9 @@ PORT_NAMES = getProperty("com", "portNames").toString(); BAUD_RATE = getProperty("com", "baudRate").toUInt(); DEV_CODES = getProperty("com", "devCodes").toString(); + + KAFKA_BROKERS = getProperty("kafka", "brokers").toString(); + KAFKA_DATA_TOPIC = getProperty("kafka", "dataTopic").toString(); } diff --git a/PhaseCompAcq/PhaseCompAcq.pro b/PhaseCompAcq/PhaseCompAcq.pro index 7da4dd1..5eef2d1 100644 --- a/PhaseCompAcq/PhaseCompAcq.pro +++ b/PhaseCompAcq/PhaseCompAcq.pro @@ -29,10 +29,9 @@ !isEmpty(target.path): INSTALLS += target -win32: LIBS += -L$$PWD/librdkafka/ -lrdkafka -lrdkafka++ +INCLUDEPATH += $$PWD/include/librdkafka +DEPENDPATH += $$PWD/include/librdkafka -INCLUDEPATH += $$PWD/librdkafka/include -DEPENDPATH += $$PWD/librdkafka/include +DISTFILES += conf/config.ini -DISTFILES += \ - conf/config.ini +unix:!macx: LIBS += -L$$PWD/lib/librdkafka/ -lrdkafka -lrdkafka++ diff --git a/PhaseCompAcq/PhaseWindow.cpp b/PhaseCompAcq/PhaseWindow.cpp index 8999d5c..459dcaf 100644 --- a/PhaseCompAcq/PhaseWindow.cpp +++ b/PhaseCompAcq/PhaseWindow.cpp @@ -61,6 +61,8 @@ // 创建设备的widget this->generateWidgetForDevice(devCodeList.at(i), i); } + + this->createKafkaMessage(); } PhaseWindow::~PhaseWindow() @@ -157,3 +159,12 @@ this->tableModelList.append(channelModelList); } +void PhaseWindow::createKafkaMessage() +{ + this->kafkaUtil = new QKafkaUtil(this); + this->kafkaUtil->setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + this->kafkaUtil->setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + + kafkaUtil->createProducer(); + kafkaUtil->produceMessage("hello world kafka kylin"); +} diff --git a/PhaseCompAcq/PhaseWindow.h b/PhaseCompAcq/PhaseWindow.h index abd1823..67b8363 100644 --- a/PhaseCompAcq/PhaseWindow.h +++ b/PhaseCompAcq/PhaseWindow.h @@ -6,6 +6,7 @@ #include "PhaseDevice.h" #include "common/utils/SettingConfig.h" +#include "common/utils/QKafkaUtil.h" namespace Ui { class PhaseWindow; @@ -30,7 +31,10 @@ QList deviceList; QList> tableModelList; + QKafkaUtil * kafkaUtil; + void generateWidgetForDevice(QString devCode, int index); + void createKafkaMessage(); }; #endif // PHASEWINDOW_H diff --git a/PhaseCompAcq/common/utils/QKafkaUtil.cpp b/PhaseCompAcq/common/utils/QKafkaUtil.cpp index 6ecd6f0..3f00452 100644 --- a/PhaseCompAcq/common/utils/QKafkaUtil.cpp +++ b/PhaseCompAcq/common/utils/QKafkaUtil.cpp @@ -16,7 +16,7 @@ } -int QKafkaUtil::initKafkaConf() +int QKafkaUtil::createProducer() { int result; result = this->conf->set("bootstrap.servers", this->brokers.toStdString(), errStr); @@ -38,3 +38,22 @@ result = 1; return result; } + +int QKafkaUtil::produceMessage(QString message) +{ + auto retCode = producer->produce(topic.toStdString(), RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, + const_cast(message.toStdString().c_str()), message.size(), + nullptr, 0, 0, nullptr, nullptr); + + if (retCode != RdKafka::ERR_NO_ERROR) + { + std::cerr << "Failed to produce to topic " << topic.toStdString() << ": " << + RdKafka::err2str(retCode) << std::endl; + } else + { + std::cerr << "Enqueued message (" << message.size() << " bytes) " << + "for topic " << topic.toStdString() << "[" << message.toStdString() <<"]" << std::endl; + } + + return retCode; +} diff --git a/PhaseCompAcq/common/utils/QKafkaUtil.h b/PhaseCompAcq/common/utils/QKafkaUtil.h index 106b289..289ea6f 100644 --- a/PhaseCompAcq/common/utils/QKafkaUtil.h +++ b/PhaseCompAcq/common/utils/QKafkaUtil.h @@ -3,8 +3,7 @@ #include -#include "librdkafka/include/rdkafkacpp.h" -#include "librdkafka/include/wingetopt.c" +#include "include/librdkafka/rdkafkacpp.h" class QKafkaUtil : public QObject { @@ -15,7 +14,8 @@ void setBrokers(QString brokers); void setTopic(QString topic); - int initKafkaConf(); + int createProducer(); + int produceMessage(QString message); private: QString brokers; diff --git a/PhaseCompAcq/common/utils/SettingConfig.cpp b/PhaseCompAcq/common/utils/SettingConfig.cpp index ef037c9..c664a21 100644 --- a/PhaseCompAcq/common/utils/SettingConfig.cpp +++ b/PhaseCompAcq/common/utils/SettingConfig.cpp @@ -8,6 +8,9 @@ PORT_NAMES = getProperty("com", "portNames").toString(); BAUD_RATE = getProperty("com", "baudRate").toUInt(); DEV_CODES = getProperty("com", "devCodes").toString(); + + KAFKA_BROKERS = getProperty("kafka", "brokers").toString(); + KAFKA_DATA_TOPIC = getProperty("kafka", "dataTopic").toString(); } diff --git a/PhaseCompAcq/common/utils/SettingConfig.h b/PhaseCompAcq/common/utils/SettingConfig.h index 2d68710..08ab277 100644 --- a/PhaseCompAcq/common/utils/SettingConfig.h +++ b/PhaseCompAcq/common/utils/SettingConfig.h @@ -31,6 +31,9 @@ int BAUD_RATE; QString DEV_CODES; + QString KAFKA_BROKERS; + QString KAFKA_DATA_TOPIC; + private: SettingConfig(); diff --git a/PhaseCompAcq/PhaseCompAcq.pro b/PhaseCompAcq/PhaseCompAcq.pro index 7da4dd1..5eef2d1 100644 --- a/PhaseCompAcq/PhaseCompAcq.pro +++ b/PhaseCompAcq/PhaseCompAcq.pro @@ -29,10 +29,9 @@ !isEmpty(target.path): INSTALLS += target -win32: LIBS += -L$$PWD/librdkafka/ -lrdkafka -lrdkafka++ +INCLUDEPATH += $$PWD/include/librdkafka +DEPENDPATH += $$PWD/include/librdkafka -INCLUDEPATH += $$PWD/librdkafka/include -DEPENDPATH += $$PWD/librdkafka/include +DISTFILES += conf/config.ini -DISTFILES += \ - conf/config.ini +unix:!macx: LIBS += -L$$PWD/lib/librdkafka/ -lrdkafka -lrdkafka++ diff --git a/PhaseCompAcq/PhaseWindow.cpp b/PhaseCompAcq/PhaseWindow.cpp index 8999d5c..459dcaf 100644 --- a/PhaseCompAcq/PhaseWindow.cpp +++ b/PhaseCompAcq/PhaseWindow.cpp @@ -61,6 +61,8 @@ // 创建设备的widget this->generateWidgetForDevice(devCodeList.at(i), i); } + + this->createKafkaMessage(); } PhaseWindow::~PhaseWindow() @@ -157,3 +159,12 @@ this->tableModelList.append(channelModelList); } +void PhaseWindow::createKafkaMessage() +{ + this->kafkaUtil = new QKafkaUtil(this); + this->kafkaUtil->setBrokers(SettingConfig::getInstance().KAFKA_BROKERS); + this->kafkaUtil->setTopic(SettingConfig::getInstance().KAFKA_DATA_TOPIC); + + kafkaUtil->createProducer(); + kafkaUtil->produceMessage("hello world kafka kylin"); +} diff --git a/PhaseCompAcq/PhaseWindow.h b/PhaseCompAcq/PhaseWindow.h index abd1823..67b8363 100644 --- a/PhaseCompAcq/PhaseWindow.h +++ b/PhaseCompAcq/PhaseWindow.h @@ -6,6 +6,7 @@ #include "PhaseDevice.h" #include "common/utils/SettingConfig.h" +#include "common/utils/QKafkaUtil.h" namespace Ui { class PhaseWindow; @@ -30,7 +31,10 @@ QList deviceList; QList> tableModelList; + QKafkaUtil * kafkaUtil; + void generateWidgetForDevice(QString devCode, int index); + void createKafkaMessage(); }; #endif // PHASEWINDOW_H diff --git a/PhaseCompAcq/common/utils/QKafkaUtil.cpp b/PhaseCompAcq/common/utils/QKafkaUtil.cpp index 6ecd6f0..3f00452 100644 --- a/PhaseCompAcq/common/utils/QKafkaUtil.cpp +++ b/PhaseCompAcq/common/utils/QKafkaUtil.cpp @@ -16,7 +16,7 @@ } -int QKafkaUtil::initKafkaConf() +int QKafkaUtil::createProducer() { int result; result = this->conf->set("bootstrap.servers", this->brokers.toStdString(), errStr); @@ -38,3 +38,22 @@ result = 1; return result; } + +int QKafkaUtil::produceMessage(QString message) +{ + auto retCode = producer->produce(topic.toStdString(), RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, + const_cast(message.toStdString().c_str()), message.size(), + nullptr, 0, 0, nullptr, nullptr); + + if (retCode != RdKafka::ERR_NO_ERROR) + { + std::cerr << "Failed to produce to topic " << topic.toStdString() << ": " << + RdKafka::err2str(retCode) << std::endl; + } else + { + std::cerr << "Enqueued message (" << message.size() << " bytes) " << + "for topic " << topic.toStdString() << "[" << message.toStdString() <<"]" << std::endl; + } + + return retCode; +} diff --git a/PhaseCompAcq/common/utils/QKafkaUtil.h b/PhaseCompAcq/common/utils/QKafkaUtil.h index 106b289..289ea6f 100644 --- a/PhaseCompAcq/common/utils/QKafkaUtil.h +++ b/PhaseCompAcq/common/utils/QKafkaUtil.h @@ -3,8 +3,7 @@ #include -#include "librdkafka/include/rdkafkacpp.h" -#include "librdkafka/include/wingetopt.c" +#include "include/librdkafka/rdkafkacpp.h" class QKafkaUtil : public QObject { @@ -15,7 +14,8 @@ void setBrokers(QString brokers); void setTopic(QString topic); - int initKafkaConf(); + int createProducer(); + int produceMessage(QString message); private: QString brokers; diff --git a/PhaseCompAcq/common/utils/SettingConfig.cpp b/PhaseCompAcq/common/utils/SettingConfig.cpp index ef037c9..c664a21 100644 --- a/PhaseCompAcq/common/utils/SettingConfig.cpp +++ b/PhaseCompAcq/common/utils/SettingConfig.cpp @@ -8,6 +8,9 @@ PORT_NAMES = getProperty("com", "portNames").toString(); BAUD_RATE = getProperty("com", "baudRate").toUInt(); DEV_CODES = getProperty("com", "devCodes").toString(); + + KAFKA_BROKERS = getProperty("kafka", "brokers").toString(); + KAFKA_DATA_TOPIC = getProperty("kafka", "dataTopic").toString(); } diff --git a/PhaseCompAcq/common/utils/SettingConfig.h b/PhaseCompAcq/common/utils/SettingConfig.h index 2d68710..08ab277 100644 --- a/PhaseCompAcq/common/utils/SettingConfig.h +++ b/PhaseCompAcq/common/utils/SettingConfig.h @@ -31,6 +31,9 @@ int BAUD_RATE; QString DEV_CODES; + QString KAFKA_BROKERS; + QString KAFKA_DATA_TOPIC; + private: SettingConfig(); diff --git a/PhaseCompAcq/conf/config.ini b/PhaseCompAcq/conf/config.ini index 6bd8c82..631f014 100644 --- a/PhaseCompAcq/conf/config.ini +++ b/PhaseCompAcq/conf/config.ini @@ -1,5 +1,8 @@ -[com] -#串口名 +[com] portNames="COM1,COM2,COM3" devCodes="9101,9102,9103" baudRate=115200 + +[kafka] +brokers="111.198.10.15:12502" +dataTopic="cppTest"