|
@@ -0,0 +1,84 @@
|
|
|
+package com.fdkankan.disruptormq;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author Xiewj
|
|
|
+ * @date 2024/1/17
|
|
|
+ */
|
|
|
+
|
|
|
+import com.fdkankan.disruptormq.DisruptorExcepitonHandler;
|
|
|
+import com.lmax.disruptor.dsl.Disruptor;
|
|
|
+import com.lmax.disruptor.dsl.ProducerType;
|
|
|
+import org.springframework.beans.BeansException;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.context.ApplicationContext;
|
|
|
+import org.springframework.context.ApplicationContextAware;
|
|
|
+import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
+
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Objects;
|
|
|
+import java.util.Optional;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 可以在sprinboot中使用,不同业务可以声明不同的DisruptorQueue,使用不同的Handler处理
|
|
|
+ */
|
|
|
+@SuppressWarnings("all")
|
|
|
+public abstract class BaseDisruptorQueue<T> implements ApplicationContextAware {
|
|
|
+ @Autowired
|
|
|
+ private DisruptorProperties disruptorProperties;
|
|
|
+ private Disruptor< DisruptorEvent<T>> disruptor;
|
|
|
+ private Map<String, DisruptorHandler> disruptorHandlerMap;
|
|
|
+
|
|
|
+ public void offer(T t) {
|
|
|
+ if (Objects.nonNull(t)) {
|
|
|
+ disruptor.publishEvent((event, sequence) -> event.setPayload(t));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void offer(List<T> ts) {
|
|
|
+ if (!CollectionUtils.isEmpty(ts)) {
|
|
|
+ ts.stream().filter(Objects::nonNull).forEach(this::offer);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void initDisruptor(String handlerName) {
|
|
|
+ this.disruptor = getDisruptor(handlerName);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Disruptor<DisruptorEvent<T>> getDisruptor(String handlerName) {
|
|
|
+ DisruptorProperties.HandlerProperties handlerProperties = getDisruptorHandlerConfig(handlerName);
|
|
|
+ Disruptor<DisruptorEvent<T>> disruptor = new Disruptor<>(DisruptorEvent::new, handlerProperties.getRingBufferSize(), //
|
|
|
+ new CustomizableThreadFactory(handlerName + "-disruptor-"), //
|
|
|
+ handlerProperties.isMultiProducer() ? ProducerType.MULTI : ProducerType.SINGLE, //
|
|
|
+ disruptorProperties.getWaitType().getWaitStrategy());
|
|
|
+ // 对于一条消息,handleEventsWith方法返回的EventHandlerGroup,Group中的每个消费者都会对消息进行消费,各个消费者之间不存在竞争(类似发布订阅)
|
|
|
+ // disruptor.handleEventsWith(getDisruptorHandler(handlerName));
|
|
|
+ // 对于一条消息,handleEventsWithWorkerPool方法返回的EventHandlerGroup,Group的消费者对于同一条消息不重复消费(类似点对点)
|
|
|
+ disruptor.handleEventsWithWorkerPool(getDisruptorHandler(handlerName));
|
|
|
+ disruptor.setDefaultExceptionHandler(new DisruptorExcepitonHandler<>());
|
|
|
+ disruptor.start();
|
|
|
+ Runtime.getRuntime().addShutdownHook(new Thread(disruptor::shutdown, handlerName + "-disruptor-shutdown"));
|
|
|
+ return disruptor;
|
|
|
+ }
|
|
|
+
|
|
|
+ private DisruptorProperties.HandlerProperties getDisruptorHandlerConfig(String handlerName) {
|
|
|
+ Map<String, DisruptorProperties.HandlerProperties> handlers = disruptorProperties.getHandlers();
|
|
|
+ DisruptorProperties.HandlerProperties handlerProperties = handlers.get(handlerName);
|
|
|
+ return Optional.ofNullable(handlerProperties).orElseGet(() -> {
|
|
|
+ DisruptorProperties.HandlerProperties defaultHandlerProperties = new DisruptorProperties.HandlerProperties();
|
|
|
+ defaultHandlerProperties.setRingBufferSize(1024 * 1024);
|
|
|
+ defaultHandlerProperties.setMultiProducer(false);
|
|
|
+ return defaultHandlerProperties;
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private DisruptorHandler<T> getDisruptorHandler(String handlerName) {
|
|
|
+ return disruptorHandlerMap.get(handlerName);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
|
|
+ disruptorHandlerMap = applicationContext.getBeansOfType(DisruptorHandler.class);
|
|
|
+ }
|
|
|
+}
|