#include "QKafkaConsumer.h" #include "SettingConfig.h" #include <iostream> #include <QJsonParseError> static volatile int runFlag = 1; static bool exit_eof = false; QKafkaConsumer::QKafkaConsumer(QObject *parent) : QThread(parent) { this->conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); this->tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); conf->set("enable.partition.eof", "true", errStr); } void QKafkaConsumer::setBrokers(QString brokers) { this->brokers = brokers; } void QKafkaConsumer::setTopic(QString topic) { this->topic = topic; } int QKafkaConsumer::createConsumer() { int ret = conf->set("metadata.broker.list", brokers.toStdString(), errStr); if (SettingConfig::getInstance().NEED_SASL == 1) { conf->set("sasl.username", SettingConfig::getInstance().SASL_USERNAME.toStdString(), errStr); conf->set("sasl.password", SettingConfig::getInstance().SASL_PASSWORD.toStdString(), errStr); conf->set("security.protocol", "sasl_plaintext", errStr); conf->set("sasl.mechanisms", "PLAIN", errStr); } if (ret != RdKafka::Conf::CONF_OK) { std::cerr << "RdKafka conf set brokerlist failed :" << errStr.c_str() << std::endl; } consumer = RdKafka::Consumer::create(conf, errStr); if (!consumer) { std::cerr << "Failed to create consumer: " << errStr << std::endl; return -1; } std::cout << "% Created consumer " << consumer->name() << std::endl; return 1; } void QKafkaConsumer::run() { RdKafka::Topic * topic = RdKafka::Topic::create(consumer, this->topic.toStdString(), tconf, errStr); if (!topic) { std::cerr << "Failed to create topic: " << errStr << std::endl; } RdKafka::ErrorCode resp = consumer->start(topic, 0, RdKafka::Topic::OFFSET_END); if (resp != RdKafka::ERR_NO_ERROR) { std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << std::endl; } while (runFlag) { RdKafka::Message * message = consumer->consume(topic, 0, 200); messageConsume(message); } } void QKafkaConsumer::exitThread() { runFlag = false; } void QKafkaConsumer::messageConsume(RdKafka::Message* message) { const RdKafka::Headers *headers; switch (message->err()) { case RdKafka::ERR__TIMED_OUT: break; case RdKafka::ERR_NO_ERROR: { /* Real message */ // std::cout << "Read msg at offset " << message->offset() << std::endl; // printf("%.*s\n", static_cast<int>(message->len()), static_cast<const char *>(message->payload())); QString messageStr = static_cast<const char *>(message->payload()); QJsonParseError jsonErr; QJsonDocument doc = QJsonDocument::fromJson(messageStr.toUtf8(), &jsonErr); if (jsonErr.error == QJsonParseError::NoError) { QJsonObject obj = doc.object(); obj.insert("cmdStr", messageStr); emit messageRecieved(obj); } if (message->key()) { std::cout << "Key: " << *message->key() << std::endl; } headers = message->headers(); if (headers) { std::vector<RdKafka::Headers::Header> hdrs = headers->get_all(); for (size_t i = 0 ; i < hdrs.size() ; i++) { const RdKafka::Headers::Header hdr = hdrs[i]; if (hdr.value() != NULL) printf(" Header: %s = \"%.*s\"\n", hdr.key().c_str(), (int)hdr.value_size(), (const char *)hdr.value()); else printf(" Header: %s = NULL\n", hdr.key().c_str()); } } break; } case RdKafka::ERR__PARTITION_EOF: /* Last message */ if (exit_eof) { runFlag = 0; } break; case RdKafka::ERR__UNKNOWN_TOPIC: case RdKafka::ERR__UNKNOWN_PARTITION: std::cerr << "Consume failed: " << message->errstr() << std::endl; runFlag = 0; break; default: /* Errors */ std::cerr << "Consume failed: " << message->errstr() << std::endl; runFlag = 0; } }