package com.casic.yizhuang.kafka; import com.casic.yizhuang.util.KafkaUtils; import com.casic.yizhuang.util.LoginUtil; import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Properties; import java.util.concurrent.ExecutionException; public class Producer extends Thread { private static final Logger LOG = LoggerFactory.getLogger(com.casic.yizhuang.kafka.Producer.class); private static KafkaProducer<String, String> producer; public static void send(String content) throws IOException { //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 if (LoginUtil.isSecurityModel()) { LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); } Properties props = KafkaUtils.producerInitProperties(); // 发布 producer = new KafkaProducer<String, String>(props); LOG.info("producer start."); ProducerRecord<String, String> record = new ProducerRecord<String, String>(KafkaProperties.SEND_DATA_TOPIC, "", content); try { LOG.info("kafka发送数据-------"+ content); // 同步发送 producer.send(record).get(); } catch (InterruptedException ie) { LOG.info("The InterruptedException occured : {}.", ie); } catch (ExecutionException ee) { LOG.info("The ExecutionException occured : {}.", ee); }finally { producer.close(); } } }