Newer
Older
device-data-creator / src / main / java / com / casic / kafka / util / KafkaProperties.java
chaizhuang on 20 Jul 2023 3 KB 新增kafka发送,时间乱序
package com.casic.kafka.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;

public final class KafkaProperties {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class);

    // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限
    public final static String DATA_TOPIC = "TEMPSTORE_8204";
    public final static String ALARM_TOPIC = "MSGQUEUE_8287";

    /**
     * 用户自己申请的机机账号keytab文件名称
     */
    public static final String USER_KEYTAB_FILE = "user.keytab";

    /**
     * 用户自己申请的机机账号名称
     */
    public static final String USER_PRINCIPAL = "kafkauser";

    private static Properties serverProps = new Properties();

    private static Properties producerProps = new Properties();

    private static Properties consumerProps = new Properties();

    private static Properties clientProps = new Properties();

    private static KafkaProperties instance = null;

    private  static final String filePath = "D:\\casic203\\software\\software\\data-creater\\kafka\\";
    private KafkaProperties() {
//        String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
        try {
            File proFile = new File(filePath + "producer.properties");

            if (proFile.exists()) {
                producerProps.load(new FileInputStream(filePath + "producer.properties"));
            }

            File conFile = new File(filePath + "producer.properties");

            if (conFile.exists()) {
                consumerProps.load(new FileInputStream(filePath + "consumer.properties"));
            }

            File serFile = new File(filePath + "server.properties");

            if (serFile.exists()) {
                serverProps.load(new FileInputStream(filePath + "server.properties"));
            }

            File cliFile = new File(filePath + "client.properties");

            if (cliFile.exists()) {
                clientProps.load(new FileInputStream(filePath + "client.properties"));
            }
        } catch (IOException e) {
            LOG.info("The Exception occured.", e);
        }
    }

    public synchronized static KafkaProperties getInstance() {
        if (null == instance) {
            instance = new KafkaProperties();
        }
        return instance;
    }

    /**
     * 获取参数值
     *
     * @param key      properites的key值
     * @param defValue 默认值
     * @return
     */
    public String getValues(String key, String defValue) {
        String rtValue = null;

        if (null == key) {
            LOG.error("key is null");
        } else {
            rtValue = getPropertiesValue(key);
        }

        if (null == rtValue) {
            LOG.warn("KafkaProperties.getValues return null, key is " + key);
            rtValue = defValue;
        }

        LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue);

        return rtValue;
    }

    /**
     * 根据key值获取server.properties的值
     *
     * @param key
     * @return
     */
    private String getPropertiesValue(String key) {
        String rtValue = serverProps.getProperty(key);

        // server.properties中没有,则再向producer.properties中获取
        if (null == rtValue) {
            rtValue = producerProps.getProperty(key);
        }

        // producer中没有,则再向consumer.properties中获取
        if (null == rtValue) {
            rtValue = consumerProps.getProperty(key);
        }

        // consumer没有,则再向client.properties中获取
        if (null == rtValue) {
            rtValue = clientProps.getProperty(key);
        }

        return rtValue;
    }
}