#include "QKafkaUtil.h" #include <iostream> QKafkaUtil::QKafkaUtil(QObject *parent) : QObject(parent) { this->conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); } void QKafkaUtil::setBrokers(QString brokers) { this->brokers = brokers; } void QKafkaUtil::setTopic(QString topic) { this->topic = topic; } int QKafkaUtil::createProducer() { int result; result = this->conf->set("bootstrap.servers", this->brokers.toStdString(), errStr); if (result != RdKafka::Conf::CONF_OK) { std::cerr << errStr << std::endl; return RdKafka::Conf::CONF_INVALID; // -1 } this->producer = RdKafka::Producer::create(this->conf, errStr); if (producer == 0) { std::cerr << "Failed to create producer: " << errStr << std::endl; return -2; } result = 1; return result; } int QKafkaUtil::produceMessage(QString message) { auto retCode = producer->produce(this->topic.toStdString(), RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, const_cast<char *>(message.toStdString().c_str()), message.size(), nullptr, 0, 0, nullptr, nullptr); if (retCode != RdKafka::ERR_NO_ERROR) { std::cerr << "Failed to produce to topic " << topic.toStdString() << ": " << RdKafka::err2str(retCode) << std::endl; } else { std::cerr << "Enqueued message (" << message.size() << " bytes) " << "for topic " << topic.toStdString() << "[" << message.toStdString() <<"]" << std::endl; } return retCode; } int QKafkaUtil::produceMessage(QString topic, QString message) { auto retCode = producer->produce(topic.toStdString(), RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, const_cast<char *>(message.toStdString().c_str()), message.size(), nullptr, 0, 0, nullptr, nullptr); if (retCode != RdKafka::ERR_NO_ERROR) { std::cerr << "Failed to produce to topic " << topic.toStdString() << ": " << RdKafka::err2str(retCode) << std::endl; } else { std::cerr << "Enqueued message (" << message.size() << " bytes) " << "for topic " << topic.toStdString() << "[" << message.toStdString() <<"]" << std::endl; } return retCode; }