ModelingMsgProducer.java 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package com.fdkankan.base.mq;
  2. import lombok.extern.log4j.Log4j2;
  3. import org.springframework.amqp.core.Queue;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.amqp.rabbit.core.ChannelCallback;
  6. import org.springframework.amqp.rabbit.support.CorrelationData;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.stereotype.Component;
  9. import com.rabbitmq.client.AMQP;
  10. import com.rabbitmq.client.Channel;
  11. import java.util.UUID;
  12. @Log4j2
  13. @Component
  14. public class ModelingMsgProducer implements RabbitTemplate.ConfirmCallback {
  15. @Autowired
  16. private Queue workQueue;
  17. private RabbitTemplate rabbitTemplate;
  18. @Autowired
  19. public ModelingMsgProducer(RabbitTemplate rabbitTemplate) {
  20. this.rabbitTemplate = rabbitTemplate;
  21. this.rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
  22. }
  23. @Override
  24. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  25. log.info(" 回调id:" + correlationData);
  26. if (ack) {
  27. log.info("消息成功消费");
  28. } else {
  29. log.info("消息消费失败:" + cause);
  30. }
  31. }
  32. public void sendMsg(String content) {
  33. CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
  34. rabbitTemplate.convertAndSend(workQueue.getName(), (Object) content, correlationId);
  35. // rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_MODELING, RabbitConfig.ROUTINGKEY_MODELING_A, content, correlationId);
  36. }
  37. public void sendMsg2(String content) {
  38. CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
  39. rabbitTemplate.convertAndSend("QUEUE_VIDEO_A", (Object) content, correlationId);
  40. }
  41. /**
  42. * 获取mq的数量
  43. * @return
  44. */
  45. public int getMessageCount() {
  46. AMQP.Queue.DeclareOk declareOk = rabbitTemplate.execute(new ChannelCallback<AMQP.Queue.DeclareOk>() {
  47. public AMQP.Queue.DeclareOk doInRabbit(Channel channel) throws Exception {
  48. return channel.queueDeclarePassive(workQueue.getName());
  49. }
  50. });
  51. return declareOk.getMessageCount();
  52. }
  53. }