Newer
Older
ZXSSCJ / DevStatusAcq / common / utils / QKafkaUtil.cpp
#include "QKafkaUtil.h"
#include <iostream>

QKafkaUtil::QKafkaUtil(QObject *parent) : QObject(parent)
{
    this->conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
}

void QKafkaUtil::setBrokers(QString brokers)
{
    this->brokers = brokers;
}
void QKafkaUtil::setTopic(QString topic)
{
    this->topic = topic;
}


int QKafkaUtil::createProducer()
{
    int result;
    result = this->conf->set("bootstrap.servers", this->brokers.toStdString(), errStr);

    if (result != RdKafka::Conf::CONF_OK)
    {
        std::cerr << errStr << std::endl;

        return RdKafka::Conf::CONF_INVALID; // -1
    }

    this->producer = RdKafka::Producer::create(this->conf, errStr);
    if (producer == 0)
    {
        std::cerr << "Failed to create producer: " << errStr << std::endl;
        return -2;
    }

    result = 1;
    return result;
}

int QKafkaUtil::produceMessage(QString message)
{
    auto retCode = producer->produce(topic.toStdString(), RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
                                    const_cast<char *>(message.toStdString().c_str()), message.size(),
                                    nullptr, 0, 0, nullptr, nullptr);

    if (retCode != RdKafka::ERR_NO_ERROR)
    {
          std::cerr << "Failed to produce to topic " << topic.toStdString() << ": " <<
            RdKafka::err2str(retCode) << std::endl;
    } else
    {
        std::cerr << "Enqueued message (" << message.size() << " bytes) " <<
                     "for topic " << topic.toStdString() << "[" << message.toStdString() <<"]" << std::endl;
    }

    return retCode;
}