package com.casic.kafka; import com.casic.kafka.util.KafkaProperties; import com.casic.kafka.util.KafkaUtils; import com.casic.kafka.util.LoginUtil; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Properties; import java.util.concurrent.ExecutionException; public class Producer { private static final Logger LOG = LoggerFactory.getLogger(Producer.class); private static KafkaProducer<String, String> producer; static { try{ if (LoginUtil.isSecurityModel()) { LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); } Properties props = KafkaUtils.producerInitProperties(); producer = new KafkaProducer<String, String>(props); }catch (IOException ex){ } } public static void send(String content, String topic){ LOG.debug("producer start."); if (producer == null) { //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 Properties props = KafkaUtils.producerInitProperties(); producer = new KafkaProducer<String, String>(props); } ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "", content); try { // 同步发送 producer.send(record).get(); LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); } catch (InterruptedException ie) { LOG.info("The InterruptedException occured : {}.", ie); } catch (ExecutionException ee) { LOG.info("The ExecutionException occured : {}.", ee); } // producer.close(); // LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); } }