Newer
Older
sink / src / main / java / org / flume / alarm / mq / RocketMQProducer.java
zhout on 2 Mar 2022 3 KB first commit
package org.flume.alarm.mq;


import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.UUID;

@Async
@Service
public class RocketMQProducer {

    private DefaultMQProducer sender;

    private String nameServer;

    private String groupName;

    private String topics;

    private String accessKey;

    private String secretKey;

    public void init() {
        this.nameServer = "11.100.7.9:9876";
        this.accessKey = "73271a637f65500ce1a997bb03eaa1b4";
        this.secretKey = "9cb3f04063c2a637dc1c712dea8c5b6a";
        this.topics = "RE-A001";
        this.groupName = "RE-A001-CONSUMER";
        sender = new DefaultMQProducer(groupName, getAclRPCHook());
        sender.setNamesrvAddr(nameServer);
        sender.setInstanceName(UUID.randomUUID().toString());
        try {
            sender.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public RocketMQProducer() {

    }

    public RocketMQProducer(String nameServer, String groupName, String topics) {
        this.nameServer = nameServer;
        this.groupName = groupName;
        this.topics = topics;
    }

    public void sendMsg(String msg, String tags) {
        Message message = new Message(topics, tags, msg.getBytes(StandardCharsets.UTF_8));
//		Message message = new Message();
//        message.setBody(msg.getBytes());
        send(message);
        System.out.println("发送消息****" + msg);

        System.out.println("发送消息****" + message);

    }

    public void send(Message message) {
        if (null == sender) this.init();
        message.setTopic(topics);
        try {
            SendResult result = sender.send(message);
            SendStatus status = result.getSendStatus();
            System.out.println("messageId=" + result.getMsgId() + ", status=" + status);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    public RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
    }

    public void destory() {
        sender.shutdown();
    }

    public String getNameServer() {
        return nameServer;
    }

    public void setNameServer(String nameServer) {
        this.nameServer = nameServer;
    }

    public String getGroupName() {
        return groupName;
    }

    public void setGroupName(String groupName) {
        this.groupName = groupName;
    }

    public String getTopics() {
        return topics;
    }

    public void setTopics(String topics) {
        this.topics = topics;
    }

    public String getAccessKey() {
        return accessKey;
    }

    public void setAccessKey(String accessKey) {
        this.accessKey = accessKey;
    }

    public String getSecretKey() {
        return secretKey;
    }

    public void setSecretKey(String secretKey) {
        this.secretKey = secretKey;
    }
}