package com.fdkankan.base.mq; import lombok.extern.log4j.Log4j2; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.ChannelCallback; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import java.util.UUID; @Log4j2 @Component public class ModelingMsgProducer implements RabbitTemplate.ConfirmCallback { // @Autowired // private Queue workQueue; private RabbitTemplate rabbitTemplate; @Value("${rabbitmq.queue}") private String queue; @Autowired public ModelingMsgProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; this.rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容 } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info(" 回调id:" + correlationData); if (ack) { log.info("消息成功消费"); } else { log.info("消息消费失败:" + cause); } } public void sendMsg(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId); // rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_MODELING, RabbitConfig.ROUTINGKEY_MODELING_A, content, correlationId); } public void sendMsg2(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("QUEUE_VIDEO_A", (Object) content, correlationId); } /** * 获取mq的数量 * @return */ public int getMessageCount() { AMQP.Queue.DeclareOk declareOk = rabbitTemplate.execute(new ChannelCallback() { public AMQP.Queue.DeclareOk doInRabbit(Channel channel) throws Exception { return channel.queueDeclarePassive(queue); } }); return declareOk.getMessageCount(); } }