12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970 |
- package com.fdkankan.base.mq;
- import lombok.extern.log4j.Log4j2;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.core.ChannelCallback;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import java.util.UUID;
- @Log4j2
- @Component
- public class ModelingMsgProducer implements RabbitTemplate.ConfirmCallback {
- // @Autowired
- // private Queue workQueue;
- private RabbitTemplate rabbitTemplate;
- @Value("${rabbitmq.queue}")
- private String queue;
- @Autowired
- public ModelingMsgProducer(RabbitTemplate rabbitTemplate) {
- this.rabbitTemplate = rabbitTemplate;
- this.rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
- }
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- log.info(" 回调id:" + correlationData);
- if (ack) {
- log.info("消息成功消费");
- } else {
- log.info("消息消费失败:" + cause);
- }
- }
- public void sendMsg(String content) {
- CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
- rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId);
- // rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_MODELING, RabbitConfig.ROUTINGKEY_MODELING_A, content, correlationId);
- }
- public void sendMsg2(String content) {
- CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
- rabbitTemplate.convertAndSend("QUEUE_VIDEO_A", (Object) content, correlationId);
- }
- /**
- * 获取mq的数量
- * @return
- */
- public int getMessageCount() {
- AMQP.Queue.DeclareOk declareOk = rabbitTemplate.execute(new ChannelCallback<AMQP.Queue.DeclareOk>() {
- public AMQP.Queue.DeclareOk doInRabbit(Channel channel) throws Exception {
- return channel.queueDeclarePassive(queue);
- }
- });
- return declareOk.getMessageCount();
- }
- }
|