Newer
Older
device-data-creator / src / main / java / com / casic / kafka / util / KafkaUtils.java
chaizhuang on 20 Jul 2023 5 KB 新增kafka发送,时间乱序
package com.casic.kafka.util;


import java.util.Properties;

public class KafkaUtils {


    // 一次请求的最大等待时间(Ms)
    private final int waitTime = 1000;

    // Broker连接地址

    // Broker连接地址
    private final static String BOOTSTRAP_SERVER = "bootstrap.servers";

    // Group id
    private final static String GROUP_ID = "group.id";

    // 消息内容使用的反序列化类
    private final static String VALUE_DESERIALIZER = "value.deserializer";

    // 消息Key值使用的反序列化类
    private final static String KEY_DESERIALIZER = "key.deserializer";

    // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
    private final static String SECURITY_PROTOCOL = "security.protocol";

    // 服务名
    private final static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";

    // 域名
    private final static String KERBEROS_DOMAIN_NAME = "kerberos.domain.name";

    // 是否自动提交offset
    private final static String ENABLE_AUTO_COMMIT = "enable.auto.commit";

    // 自动提交offset的时间间隔
    private final static String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";

    // 会话超时时间
    private final static String SESSION_TIMEOUT_MS = "session.timeout.ms";

    // 客户端ID
    private final static String CLIENT_ID = "client.id";

    // Key序列化类
    private final static String KEY_SERIALIZER = "key.serializer";

    // Value序列化类
    private final static String VALUE_SERIALIZER = "value.serializer";


    // 分区类名
    private final static String PARTITIONER_NAME = "partitioner.class";

    // 默认发送100条消息
    private final static int MESSAGE_NUM = 100;


    /**
     * 用户自己申请的机机账号keytab文件名称
     */
    private static final String USER_KEYTAB_FILE = "user.keytab";

    /**
     * 用户自己申请的机机账号名称
     */
    private static final String USER_PRINCIPAL = "kafkauser";


    public static Properties consumerInitProperties() {
        Properties props = new Properties();
        KafkaProperties kafkaProc = KafkaProperties.getInstance();

        // Broker连接地址
        props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007"));
        // Group id
        props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer"));
        // 是否自动提交offset
        props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true"));
        // 自动提交offset的时间间隔
        props.put(AUTO_COMMIT_INTERVAL_MS, kafkaProc.getValues(AUTO_COMMIT_INTERVAL_MS,"1000"));
        // 会话超时时间
        props.put(SESSION_TIMEOUT_MS, kafkaProc.getValues(SESSION_TIMEOUT_MS, "30000"));
        // 消息Key值使用的反序列化类
        props.put(KEY_DESERIALIZER,
                kafkaProc.getValues(KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"));
        // 消息内容使用的反序列化类
        props.put(VALUE_DESERIALIZER,
                kafkaProc.getValues(VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"));
        // 安全协议类型
        props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT"));

//        props.put(SASL_MECHANISM, "GSSAPI");
        // 服务名
        props.put(SASL_KERBEROS_SERVICE_NAME, "kafka");
        // 域名
        props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com"));

//        System.setProperty("java.security.auth.login.config","D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\77042.jaas.conf");

        return props;
    }

    public static Properties producerInitProperties() {
        Properties props = new Properties();
        KafkaProperties kafkaProc = KafkaProperties.getInstance();

        // Broker地址列表
        props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007"));
        // 客户端ID
        props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer"));
        // Key序列化类
        props.put(KEY_SERIALIZER,
                kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"));
        // Value序列化类
        props.put(VALUE_SERIALIZER,
                kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"));
        // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
        props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT"));
        // 服务名
        props.put(SASL_KERBEROS_SERVICE_NAME, "kafka");
        // 域名
        props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com"));
        // 分区类名
        props.put(PARTITIONER_NAME,
                kafkaProc.getValues(PARTITIONER_NAME, "com.casic.kafka.util.SimplePartitioner"));

        System.setProperty("java.security.auth.login.config","D:\\casic203\\software\\software\\data-creater\\kafka\\kafkaClient.jaas.conf");

        return props;
    }


}