Newer
Older
ZXSSCJ / PhaseCompAcq / common / utils / QKafkaConsumer.cpp
#include "QKafkaConsumer.h"
#include "SettingConfig.h"
#include <iostream>
#include <QJsonParseError>

static volatile int runFlag = 1;
static bool exit_eof = false;

QKafkaConsumer::QKafkaConsumer(QObject *parent) : QThread(parent)
{
    this->conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    this->tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    conf->set("enable.partition.eof", "true", errStr);
}

void QKafkaConsumer::setBrokers(QString brokers)
{
    this->brokers = brokers;
}
void QKafkaConsumer::setTopic(QString topic)
{
    this->topic = topic;
}

int QKafkaConsumer::createConsumer()
{
    int ret = conf->set("metadata.broker.list", brokers.toStdString(), errStr);
    if (SettingConfig::getInstance().NEED_SASL == 1)
    {
        conf->set("sasl.username", SettingConfig::getInstance().SASL_USERNAME.toStdString(), errStr);
        conf->set("sasl.password", SettingConfig::getInstance().SASL_PASSWORD.toStdString(), errStr);
        conf->set("security.protocol", "sasl_plaintext", errStr);
        conf->set("sasl.mechanisms", "PLAIN", errStr);
    }

    if (ret != RdKafka::Conf::CONF_OK)
    {
        std::cerr << "RdKafka conf set brokerlist failed :" << errStr.c_str() << std::endl;
    }

    consumer = RdKafka::Consumer::create(conf, errStr);
    if (!consumer) {
        std::cerr << "Failed to create consumer: " << errStr << std::endl;
        return -1;
    }

    std::cout << "% Created consumer " << consumer->name() << std::endl;

    return 1;
}

void QKafkaConsumer::run()
{
    RdKafka::Topic * topic = RdKafka::Topic::create(consumer, this->topic.toStdString(), tconf, errStr);
    if (!topic) {
        std::cerr << "Failed to create topic: " << errStr << std::endl;
    }

    RdKafka::ErrorCode resp = consumer->start(topic, 0, RdKafka::Topic::OFFSET_END);
    if (resp != RdKafka::ERR_NO_ERROR) {
        std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << std::endl;
    }

    while (runFlag)
    {
        RdKafka::Message * message = consumer->consume(topic, 0, 200);
        messageConsume(message);
    }
}

void QKafkaConsumer::exitThread()
{
    runFlag = false;
}

void QKafkaConsumer::messageConsume(RdKafka::Message* message) {
    const RdKafka::Headers *headers;

    switch (message->err()) {
    case RdKafka::ERR__TIMED_OUT:
        break;

    case RdKafka::ERR_NO_ERROR:
    {
        /* Real message */
//        std::cout << "Read msg at offset " << message->offset() << std::endl;
//        printf("%.*s\n", static_cast<int>(message->len()), static_cast<const char *>(message->payload()));

        QString messageStr = static_cast<const char *>(message->payload());
        QJsonParseError jsonErr;
        QJsonDocument doc = QJsonDocument::fromJson(messageStr.toUtf8(), &jsonErr);
        if (jsonErr.error == QJsonParseError::NoError)
        {
            QJsonObject obj = doc.object();
            obj.insert("cmdStr", messageStr);
            emit messageRecieved(obj);
        }

        if (message->key()) {
            std::cout << "Key: " << *message->key() << std::endl;
        }
        headers = message->headers();
        if (headers) {
            std::vector<RdKafka::Headers::Header> hdrs = headers->get_all();
            for (size_t i = 0 ; i < hdrs.size() ; i++) {
                const RdKafka::Headers::Header hdr = hdrs[i];

                if (hdr.value() != NULL)
                    printf(" Header: %s = \"%.*s\"\n",
                           hdr.key().c_str(),
                           (int)hdr.value_size(), (const char *)hdr.value());
                else
                    printf(" Header:  %s = NULL\n", hdr.key().c_str());
            }
        }
        break;
    }

    case RdKafka::ERR__PARTITION_EOF:
        /* Last message */
        if (exit_eof) {
            runFlag = 0;
        }
        break;

    case RdKafka::ERR__UNKNOWN_TOPIC:
    case RdKafka::ERR__UNKNOWN_PARTITION:
        std::cerr << "Consume failed: " << message->errstr() << std::endl;
        runFlag = 0;
        break;

    default:
        /* Errors */
        std::cerr << "Consume failed: " << message->errstr() << std::endl;
        runFlag = 0;
    }
}