package com.casic.kafka.util; import java.util.Properties; public class KafkaUtils { // 一次请求的最大等待时间(Ms) private final int waitTime = 1000; // Broker连接地址 // Broker连接地址 private final static String BOOTSTRAP_SERVER = "bootstrap.servers"; // Group id private final static String GROUP_ID = "group.id"; // 消息内容使用的反序列化类 private final static String VALUE_DESERIALIZER = "value.deserializer"; // 消息Key值使用的反序列化类 private final static String KEY_DESERIALIZER = "key.deserializer"; // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT private final static String SECURITY_PROTOCOL = "security.protocol"; // 服务名 private final static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name"; // 域名 private final static String KERBEROS_DOMAIN_NAME = "kerberos.domain.name"; // 是否自动提交offset private final static String ENABLE_AUTO_COMMIT = "enable.auto.commit"; // 自动提交offset的时间间隔 private final static String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms"; // 会话超时时间 private final static String SESSION_TIMEOUT_MS = "session.timeout.ms"; // 客户端ID private final static String CLIENT_ID = "client.id"; // Key序列化类 private final static String KEY_SERIALIZER = "key.serializer"; // Value序列化类 private final static String VALUE_SERIALIZER = "value.serializer"; // 分区类名 private final static String PARTITIONER_NAME = "partitioner.class"; // 默认发送100条消息 private final static int MESSAGE_NUM = 100; /** * 用户自己申请的机机账号keytab文件名称 */ private static final String USER_KEYTAB_FILE = "user.keytab"; /** * 用户自己申请的机机账号名称 */ private static final String USER_PRINCIPAL = "kafkauser"; public static Properties consumerInitProperties() { Properties props = new Properties(); KafkaProperties kafkaProc = KafkaProperties.getInstance(); // Broker连接地址 props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); // Group id props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer")); // 是否自动提交offset props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true")); // 自动提交offset的时间间隔 props.put(AUTO_COMMIT_INTERVAL_MS, kafkaProc.getValues(AUTO_COMMIT_INTERVAL_MS,"1000")); // 会话超时时间 props.put(SESSION_TIMEOUT_MS, kafkaProc.getValues(SESSION_TIMEOUT_MS, "30000")); // 消息Key值使用的反序列化类 props.put(KEY_DESERIALIZER, kafkaProc.getValues(KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); // 消息内容使用的反序列化类 props.put(VALUE_DESERIALIZER, kafkaProc.getValues(VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); // 安全协议类型 props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); // props.put(SASL_MECHANISM, "GSSAPI"); // 服务名 props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); // 域名 props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); // System.setProperty("java.security.auth.login.config","D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\77042.jaas.conf"); return props; } public static Properties producerInitProperties() { Properties props = new Properties(); KafkaProperties kafkaProc = KafkaProperties.getInstance(); // Broker地址列表 props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); // 客户端ID props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer")); // Key序列化类 props.put(KEY_SERIALIZER, kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); // Value序列化类 props.put(VALUE_SERIALIZER, kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); // 服务名 props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); // 域名 props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); // 分区类名 props.put(PARTITIONER_NAME, kafkaProc.getValues(PARTITIONER_NAME, "com.casic.kafka.util.SimplePartitioner")); System.setProperty("java.security.auth.login.config","D:\\casic203\\software\\software\\data-creater\\kafka\\kafkaClient.jaas.conf"); return props; } }