ModelingMsgProducer.java 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package com.fdkankan.base.mq;
  2. import lombok.extern.log4j.Log4j2;
  3. import org.springframework.amqp.core.Queue;
  4. import org.springframework.amqp.rabbit.connection.CorrelationData;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.amqp.rabbit.core.ChannelCallback;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.beans.factory.annotation.Value;
  9. import org.springframework.stereotype.Component;
  10. import com.rabbitmq.client.AMQP;
  11. import com.rabbitmq.client.Channel;
  12. import java.util.UUID;
  13. @Log4j2
  14. @Component
  15. public class ModelingMsgProducer implements RabbitTemplate.ConfirmCallback {
  16. // @Autowired
  17. // private Queue workQueue;
  18. private RabbitTemplate rabbitTemplate;
  19. @Value("${rabbitmq.queue}")
  20. private String queue;
  21. @Autowired
  22. public ModelingMsgProducer(RabbitTemplate rabbitTemplate) {
  23. this.rabbitTemplate = rabbitTemplate;
  24. this.rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
  25. }
  26. @Override
  27. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  28. log.info(" 回调id:" + correlationData);
  29. if (ack) {
  30. log.info("消息成功消费");
  31. } else {
  32. log.info("消息消费失败:" + cause);
  33. }
  34. }
  35. public void sendMsg(String content) {
  36. CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
  37. rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId);
  38. // rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_MODELING, RabbitConfig.ROUTINGKEY_MODELING_A, content, correlationId);
  39. }
  40. public void sendMsg2(String content) {
  41. CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
  42. rabbitTemplate.convertAndSend("QUEUE_VIDEO_A", (Object) content, correlationId);
  43. }
  44. /**
  45. * 获取mq的数量
  46. * @return
  47. */
  48. public int getMessageCount() {
  49. AMQP.Queue.DeclareOk declareOk = rabbitTemplate.execute(new ChannelCallback<AMQP.Queue.DeclareOk>() {
  50. public AMQP.Queue.DeclareOk doInRabbit(Channel channel) throws Exception {
  51. return channel.queueDeclarePassive(queue);
  52. }
  53. });
  54. return declareOk.getMessageCount();
  55. }
  56. }