|
@@ -0,0 +1,120 @@
|
|
|
+package com.fdkk.sxz.util;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import sun.misc.BASE64Encoder;
|
|
|
+
|
|
|
+import java.io.BufferedReader;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.InputStreamReader;
|
|
|
+import java.net.HttpURLConnection;
|
|
|
+import java.net.URL;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Objects;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author Xiewj
|
|
|
+ * @date 2022/3/14
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class RabbitMqUtil {
|
|
|
+
|
|
|
+ @Value("${spring.rabbitmq.host}")
|
|
|
+ private String host;
|
|
|
+ @Value("${spring.rabbitmq.apiport}")
|
|
|
+ private String port;
|
|
|
+ @Value("${spring.rabbitmq.username}")
|
|
|
+ private String username;
|
|
|
+ @Value("${spring.rabbitmq.password}")
|
|
|
+ private String password;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 队列任务总数 * * @param queueName * @return
|
|
|
+ */
|
|
|
+ public int getMessageCount(String queueName) throws IOException {
|
|
|
+ String apiMessage = getApiMessage(queueName);
|
|
|
+ if (Objects.equals(apiMessage, "")) {
|
|
|
+ RabbitMqUtil.log.error("请求RabbitMQ API时出错!!");
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ JSONObject jsonObject = JSON.parseObject(apiMessage);
|
|
|
+ return Integer.parseInt(jsonObject.get("messages").toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 队列ready任务数 * * @param queueName * @return
|
|
|
+ */
|
|
|
+ public int getMessageReadyCount(String queueName) throws IOException {
|
|
|
+ String apiMessage = getApiMessage(queueName);
|
|
|
+ if (Objects.equals(apiMessage, "")) {
|
|
|
+ RabbitMqUtil.log.error("请求RabbitMQ API时出错!!");
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ JSONObject jsonObject = JSON.parseObject(apiMessage);
|
|
|
+ return Integer.parseInt(jsonObject.get("messages_ready").toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 队列unack数MQ * * @param queueName * @return
|
|
|
+ */
|
|
|
+ public int getMessagesUnacknowledgedCount(String queueName) throws IOException {
|
|
|
+ String apiMessage = getApiMessage(queueName);
|
|
|
+ if (Objects.equals(apiMessage, "")) {
|
|
|
+ RabbitMqUtil.log.error("请求RabbitMQ API时出错!!");
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ JSONObject jsonObject = JSON.parseObject(apiMessage);
|
|
|
+ return Integer.parseInt(jsonObject.get("messages_unacknowledged").toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取队列消息总数、ready消息数、unack消息数 * * @param queueName * @return Map<String,Integer>
|
|
|
+ */
|
|
|
+ public Map<String, Integer> getMQCountMap(String queueName) throws IOException {
|
|
|
+ String apiMessage = getApiMessage(queueName);
|
|
|
+ JSONObject jsonObject = JSON.parseObject(apiMessage);
|
|
|
+ Map<String, Integer> map = new HashMap<>();
|
|
|
+ map.put("messages", Integer.parseInt(jsonObject.get("messages").toString()));
|
|
|
+ map.put("messages_ready", Integer.parseInt(jsonObject.get("messages_ready").toString()));
|
|
|
+ map.put("messages_unacknowledged", Integer.parseInt(jsonObject.get("messages_unacknowledged").toString()));
|
|
|
+ return map;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getApiMessage(String queueName) throws IOException {
|
|
|
+ //发送一个GET请求
|
|
|
+ HttpURLConnection httpConn = null;
|
|
|
+ BufferedReader in = null;
|
|
|
+
|
|
|
+ String urlString = "http://" + host + ":" + port + "/api/queues/%2f/" + queueName;
|
|
|
+ URL url = new URL(urlString);
|
|
|
+ httpConn = (HttpURLConnection) url.openConnection();
|
|
|
+ //设置用户名密码
|
|
|
+ String auth = username + ":" + password;
|
|
|
+ BASE64Encoder enc = new BASE64Encoder();
|
|
|
+ String encoding = enc.encode(auth.getBytes());
|
|
|
+ httpConn.setDoOutput(true);
|
|
|
+ httpConn.setRequestProperty("Authorization", "Basic " + encoding);
|
|
|
+ // 创建实际的链接
|
|
|
+ httpConn.connect();
|
|
|
+ //读取响应
|
|
|
+ if (httpConn.getResponseCode() == HttpURLConnection.HTTP_OK) {
|
|
|
+ StringBuilder content = new StringBuilder();
|
|
|
+ String tempStr = "";
|
|
|
+ in = new BufferedReader(new InputStreamReader(httpConn.getInputStream()));
|
|
|
+ while ((tempStr = in.readLine()) != null) {
|
|
|
+ content.append(tempStr);
|
|
|
+ }
|
|
|
+ in.close();
|
|
|
+ httpConn.disconnect();
|
|
|
+ return content.toString();
|
|
|
+ } else {
|
|
|
+ httpConn.disconnect();
|
|
|
+ return "";
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|