Newer
Older
yizhuang / src / main / java / com / casic / yizhuang / kafka / Producer.java
package com.casic.yizhuang.kafka;

import com.casic.yizhuang.util.KafkaUtils;
import com.casic.yizhuang.util.LoginUtil;
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class Producer extends Thread {

    private static final Logger LOG = LoggerFactory.getLogger(com.casic.yizhuang.kafka.Producer.class);

    private static KafkaProducer<String, String> producer;

    public static void send(String content) throws IOException {
        //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
        if (LoginUtil.isSecurityModel()) {
            LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE);
        }
        Properties props = KafkaUtils.producerInitProperties();
        // 发布
        producer = new KafkaProducer<String, String>(props);
        LOG.info("producer start.");
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(KafkaProperties.SEND_DATA_TOPIC, "", content);
        try {
            LOG.info("kafka发送数据-------"+ content);
            // 同步发送
            producer.send(record).get();
        } catch (InterruptedException ie) {
            LOG.info("The InterruptedException occured : {}.", ie);
        } catch (ExecutionException ee) {
            LOG.info("The ExecutionException occured : {}.", ee);
        }finally {
            producer.close();
        }

    }

}