Newer
Older
sink / src / main / java / org / flume / alarm / mq / RocketMQConsumer.java
zhout on 2 Mar 2022 2 KB first commit
package org.flume.alarm.mq;


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.UUID;

/**
 * rocketMq消费者
 * 
 * @author : XIAOZF
 * @date : 2018年4月3日下午5:40:57
 */
public class RocketMQConsumer {
	private DefaultMQPushConsumer consumer;

	private MessageListener listener;

	private String nameServer;

	private String groupName;

	private String topics;
	
	@Autowired
	private MessageListener messageListener;
	
	public RocketMQConsumer(){
		
	}

	public RocketMQConsumer(MessageListener listener, String nameServer, String groupName, String topics) {
		this.listener = listener;
		this.nameServer = nameServer;
		this.groupName = groupName;
		this.topics = topics;
	}

	public void init() {
		consumer = new DefaultMQPushConsumer(groupName);
		consumer.setNamesrvAddr(nameServer);
		try {
			consumer.subscribe(topics, "*");
		} catch (MQClientException e) {
			e.printStackTrace();
		}
		consumer.setInstanceName(UUID.randomUUID().toString());
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
		consumer.registerMessageListener((MessageListenerConcurrently)messageListener);
		try {
			consumer.start();
		} catch (MQClientException e) {
			e.printStackTrace();
		}
	}
	
	public void initTest() {
		consumer = new DefaultMQPushConsumer(groupName);
		consumer.setNamesrvAddr(nameServer);
		try {
			consumer.subscribe(topics, "*");
		} catch (MQClientException e) {
			e.printStackTrace();
		}
		consumer.setInstanceName(UUID.randomUUID().toString());
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
		consumer.registerMessageListener((MessageListenerConcurrently) this.listener);

		try {
			consumer.start();
		} catch (MQClientException e) {
			e.printStackTrace();
		}
		System.out.println("RocketMQConsumer Started! group=" + consumer.getConsumerGroup() + " instance="
				+ consumer.getInstanceName());
	}
	
	public void destory(){
		consumer.shutdown();
	}

	public String getNameServer() {
		return nameServer;
	}

	public void setNameServer(String nameServer) {
		this.nameServer = nameServer;
	}

	public String getGroupName() {
		return groupName;
	}

	public void setGroupName(String groupName) {
		this.groupName = groupName;
	}

	public String getTopics() {
		return topics;
	}

	public void setTopics(String topics) {
		this.topics = topics;
	}
}