RabbitMqProducer.java 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package com.fdkankan.rabbitmq.util;
  2. import cn.hutool.json.JSONObject;
  3. import com.rabbitmq.client.AMQP.Queue.DeclareOk;
  4. import com.rabbitmq.client.Channel;
  5. import lombok.Data;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.amqp.rabbit.connection.CorrelationData;
  9. import org.springframework.amqp.rabbit.core.ChannelCallback;
  10. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.stereotype.Component;
  13. import java.util.UUID;
  14. /**
  15. * <p>
  16. * TODO
  17. * </p>
  18. *
  19. * @author dengsixing
  20. * @since 2022/4/19
  21. **/
  22. @Component
  23. @Data
  24. public class RabbitMqProducer {
  25. private Logger log = LoggerFactory.getLogger(this.getClass().getName());
  26. // @Autowired
  27. // ModelingQueueConfig modelingQueueConfig;
  28. @Autowired
  29. private RabbitTemplate rabbitTemplate;
  30. /**
  31. * 工作队列模式发送消息
  32. * @param queue 队列名
  33. * @param content 载荷
  34. */
  35. public void sendByWorkQueue(String queue, Object content){
  36. String messageId = UUID.randomUUID().toString();
  37. log.info("开始发送Mq消息,messageId:{},消息队列:{},消息内容:{}",messageId, queue, new JSONObject(content).toString());
  38. rabbitTemplate.convertAndSend(queue, content, message -> {
  39. message.getMessageProperties().setMessageId(messageId);
  40. return message;
  41. }, new CorrelationData(messageId));
  42. }
  43. /**
  44. * 工作队列模式发送消息
  45. * @param exchange 交换机
  46. * @param queue 队列名
  47. * @param content 载荷
  48. */
  49. public void sendByWorkQueue(String exchange,String queue, Object content){
  50. String messageId = UUID.randomUUID().toString();
  51. log.info("开始发送Mq消息,messageId:{},消息队列:{},消息内容:{}",messageId, queue, new JSONObject(content).toString());
  52. rabbitTemplate.convertAndSend(exchange,queue, content, message -> {
  53. message.getMessageProperties().setMessageId(messageId);
  54. return message;
  55. }, new CorrelationData(messageId));
  56. }
  57. /**
  58. * 工作队列模式发送带优先级消息
  59. * @param queue 队列名
  60. * @param content 载荷
  61. * @param priority 优先级,正整数,值越大优先级越高,当值大于队列设置的最大优先级时,效果等同于最大优先级
  62. */
  63. public void sendByWorkQueue(String queue, Object content, Integer priority){
  64. String messageId = UUID.randomUUID().toString();
  65. log.info("开始发送Mq消息,messageId:{},消息队列:{},消息内容:{}",messageId, queue, new JSONObject(content).toString());
  66. rabbitTemplate.convertAndSend(queue, content, message -> {
  67. message.getMessageProperties().setMessageId(messageId);
  68. message.getMessageProperties().setPriority(priority);
  69. return message;
  70. }, new CorrelationData(messageId));
  71. }
  72. /**
  73. * 获取队列未被消费的消息数量
  74. * @return
  75. */
  76. public int getMessageCount(String queueName) {
  77. DeclareOk declareOk = rabbitTemplate.execute(new ChannelCallback<DeclareOk>() {
  78. public DeclareOk doInRabbit(Channel channel) throws Exception {
  79. return channel.queueDeclarePassive(queueName);
  80. }
  81. });
  82. return declareOk.getMessageCount();
  83. }
  84. /**
  85. * 工作队列模式发送消息到多个队列
  86. * @param queue 队列名
  87. * @param content 载荷
  88. */
  89. public void sendByWorkToQueues(Object content, String... queues){
  90. for (String queue : queues) {
  91. String messageId = UUID.randomUUID().toString();
  92. log.info("开始发送Mq消息,messageId:{},消息队列:{},消息内容:{}",messageId, queue, new JSONObject(content).toString());
  93. rabbitTemplate.convertAndSend(queue, content, message -> {
  94. message.getMessageProperties().setMessageId(messageId);
  95. return message;
  96. }, new CorrelationData(messageId));
  97. }
  98. }
  99. }