public class RabbitAutoConfiguration {
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(ConnectionFactory.class)
protected static class RabbitConnectionFactoryCreator {
@Bean
public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties,
ObjectProvider connectionNameStrategy) throws Exception {
PropertyMapper map = PropertyMapper.get();
CachingConnectionFactory factory = new CachingConnectionFactory(getRabbitConnectionFactoryBean(properties).getObject());
// 设置属性值
return factory;
}
private RabbitConnectionFactoryBean getRabbitConnectionFactoryBean(RabbitProperties properties) throws Exception {
PropertyMapper map = PropertyMapper.get();
RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
// ... 设置属性值
factory.afterPropertiesSet();
return factory;
}
}
}
public class RabbitAutoConfiguration {
@Configuration(proxyBeanMethods = false)
@Import(RabbitConnectionFactoryCreator.class)
protected static class RabbitTemplateConfiguration {
// 可自定义该类来自定义自己的实现
@Bean
@ConditionalOnMissingBean
public RabbitTemplateConfigurer rabbitTemplateConfigurer(RabbitProperties properties,
ObjectProvider messageConverter,
ObjectProvider retryTemplateCustomizers) {
RabbitTemplateConfigurer configurer = new RabbitTemplateConfigurer();
// ... 配置消息转换
return configurer;
}
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean(RabbitOperations.class)
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate();
configurer.configure(template, connectionFactory);
return template;
}
}
}
以上配置比较简单,都是一些基本的配置,配置Rabbit的连接工厂,配置Template,客户端操作的模版RabbitTemplate对象。
// 消息监听的核心就在导入的类中
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
}
注解核心配置
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableRabbit.class)
class RabbitAnnotationDrivenConfiguration {
private final ObjectProvider messageConverter;
private final ObjectProvider messageRecoverer;
private final ObjectProvider retryTemplateCustomizers;
private final RabbitProperties properties;
RabbitAnnotationDrivenConfiguration(ObjectProvider messageConverter,
ObjectProvider messageRecoverer,
ObjectProvider retryTemplateCustomizers, RabbitProperties properties) {
this.messageConverter = messageConverter;
this.messageRecoverer = messageRecoverer;
this.retryTemplateCustomizers = retryTemplateCustomizers;
this.properties = properties;
}
@Bean
@ConditionalOnMissingBean
SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurer() {
SimpleRabbitListenerContainerFactoryConfigurer configurer = new SimpleRabbitListenerContainerFactoryConfigurer();
configurer.setMessageConverter(this.messageConverter.getIfUnique());
configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique());
configurer.setRetryTemplateCustomizers(this.retryTemplateCustomizers.orderedStream().collect(Collectors.toList()));
configurer.setRabbitProperties(this.properties);
return configurer;
}
@Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple",matchIfMissing = true)
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
// 这里还有一个DirectRabbitListenerContainerFactory配置
// 消息监听容器有两种一种是SIMPLE,还有一种是DIRECT
@Configuration(proxyBeanMethods = false)
// 该注解重点,是由它的@Import来解析@RabbitListener注解
@EnableRabbit
@ConditionalOnMissingBean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
static class EnableRabbitConfiguration {
}
}
public final class SimpleRabbitListenerContainerFactoryConfigurer
extends AbstractRabbitListenerContainerFactoryConfigurer {
@Override
public void configure(SimpleRabbitListenerContainerFactory factory, ConnectionFactory connectionFactory) {
PropertyMapper map = PropertyMapper.get();
// 以下会为SimpleRabbitListenerContainerFactory工厂配置基础信息(通过配置文件设置的信息)
RabbitProperties.SimpleContainer config = getRabbitProperties().getListener().getSimple();
configure(factory, connectionFactory, config);
// 设置并发信息
map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers);
map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers);
map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize);
}
}
主要注册一个BeanPostProcessor和RabbitListenerEndpointRegistry创建消息监听容器管理生命周期。
@Import(RabbitListenerConfigurationSelector.class)
public @interface EnableRabbit {
}
@Order
public class RabbitListenerConfigurationSelector implements DeferredImportSelector {
@Override
public String[] selectImports(AnnotationMetadata importingClassMetadata) {
// 注册如下Bean对象
return new String[] { RabbitBootstrapConfiguration.class.getName() };
}
}
RabbitBootstrapConfiguration.java
public class RabbitBootstrapConfiguration implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
if (!registry.containsBeanDefinition(
RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {
// 注册一个BeanPostProcessor对象,用来处理@RabbitListener注解
registry.registerBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
new RootBeanDefinition(RabbitListenerAnnotationBeanPostProcessor.class));
}
if (!registry.containsBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
// 注册消息监听容器MessageListenerContainer对象,同时还管理它们的生命周期
// MessageListenerContainer实现了SmartLifecycle接口
//(生命周期接口,在Spring中如果只是实现Lifecycle接口是没有作用的,只能实现SmartLifecycle接口)
registry.registerBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
new RootBeanDefinition(RabbitListenerEndpointRegistry.class));
}
}
}
public class RabbitListenerAnnotationBeanPostProcessor implements BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware, SmartInitializingSingleton {
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
// 获取目标类(如果是代理类)
Class<?> targetClass = AopUtils.getTargetClass(bean);
// 在targetClass类上及内部方法上查找@RabbitListener注解
final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
// 在buildMetadata方法中解析完后,开始遍历处理所有的监听方法
for (ListenerMethod lm : metadata.listenerMethods) {
// 由于@RabbitListener可以重复定义(一个方法上可有有多个该注解),所有这里是个数组
for (RabbitListener rabbitListener : lm.annotations) {
// 处理消息监听
processAmqpListener(rabbitListener, lm.method, bean, beanName);
}
}
if (metadata.handlerMethods.length > 0) {
processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
}
return bean;
}
private TypeMetadata buildMetadata(Class<?> targetClass) {
Collection classLevelListeners = findListenerAnnotations(targetClass);
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List methods = new ArrayList<>();
final List multiMethods = new ArrayList<>();
// 遍历targetClass上的所有方法
ReflectionUtils.doWithMethods(targetClass, method -> {
// 当前Method上查找@RabbitListener直接
Collection listenerAnnotations = findListenerAnnotations(method);
if (listenerAnnotations.size() > 0) {
// 如果方法上有该注解,则将当前的Method对象及所有的RabbitListener注释类封装到ListenerMethod中
methods.add(new ListenerMethod(method,
listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()])));
}
// 如果targetClass类上有@RabbitListener注解,那么会查找方法上是否有@RabbitHandler注解
// 注意:如果类上有@RabbitListener注解,那么方法上必须使用@RabbitHandler注解
// 在这里我们不分析该种情况,我们只分析在方法上有@RabbitListener注解的
if (hasClassLevelListeners) {
RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class);
if (rabbitHandler != null) {
multiMethods.add(method);
}
}
}, ReflectionUtils.USER_DECLARED_METHODS);
if (methods.isEmpty() && multiMethods.isEmpty()) {
return TypeMetadata.EMPTY;
}
// 最后将解析出来的信息包装到TypeMetadata对象中
return new TypeMetadata(
methods.toArray(new ListenerMethod[methods.size()]),
multiMethods.toArray(new Method[multiMethods.size()]),
classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()]));
}
}
在上一步将所有的方法(方法上有@RabbitListener注解的)解析处理后,接下来开始处理消息监听
public class RabbitListenerAnnotationBeanPostProcessor {
private final RabbitListenerEndpointRegistrar registrar = new RabbitListenerEndpointRegistrar();
public Object postProcessAfterInitialization(...) throws BeansException {
processAmqpListener(rabbitListener, lm.method, bean, beanName);
}
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
Method methodToUse = checkProxy(method, bean);
// 为每个监听的方法Method创建端点(执行消息处理程序)
MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
endpoint.setMethod(methodToUse);
// 处理监听程序
processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
Object target, String beanName) {
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
// 如果@RabbitListener没有配置id属性,那么将生成如下信息id
// org.springframework.amqp.rabbit.RabbitListenerEndpointContainer# + this.counter.getAndIncrement();
endpoint.setId(getEndpointId(rabbitListener));
// 设置监听的队列信息(@RabbitListener中配置的queues)
endpoint.setQueueNames(resolveQueues(rabbitListener));
// 并发数设置,会覆盖配置文件中配置的
endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
endpoint.setBeanFactory(this.beanFactory);
// ...
// 解析如果配置了线程池,则在指定的线程池中运行
resolveExecutor(endpoint, rabbitListener, target, beanName);
// ...
// 设置应答模式
resolveAckMode(endpoint, rabbitListener);
// ...
// 获取消息监听容器工厂,这就是获取在2.1中配置的SIMPLE类型的监听容器工厂
// 默认这里没有配置将会返回null
RabbitListenerContainerFactory<?> factory = resolveContainerFactory(rabbitListener, target, beanName);
this.registrar.registerEndpoint(endpoint, factory);
}
}
接着上面构建完MethodRabbitListenerEndpoint对象后,将所有的监听方法保存
public class RabbitListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
// 这里的factory是null
public void registerEndpoint(RabbitListenerEndpoint endpoint, @Nullable RabbitListenerContainerFactory<?> factory) {
// factory,我们在实际创建容器之前推迟解决(这是源码中的注释说明)
AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
synchronized (this.endpointDescriptors) {
// 默认是false
if (this.startImmediately) { // Register and start immediately
this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor), true);
} else {
// 添加到集合中
this.endpointDescriptors.add(descriptor);
}
}
}
}
到此消息监听@RabbitListener注解的方法就处理完成了,所有的监听方法都保存到了RabbitListenerAnnotationBeanPostProcessor.registrar.endpointDescriptors集合中。
RabbitListenerAnnotationBeanPostProcessor 处理器程序实现了SmartInitializingSingleton接口,所以在所有的Bean创建完成以后会执行Bean实现了SmartInitializingSingleton#afterSingletonsInstantiated的方法。
public class RabbitListenerAnnotationBeanPostProcessor implements ... SmartInitializingSingleton {
public static final String DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "rabbitListenerContainerFactory";
private String defaultContainerFactoryBeanName = DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME;
@Override
public void afterSingletonsInstantiated() {
// 设置BeanFactory工厂
this.registrar.setBeanFactory(this.beanFactory);
if (this.beanFactory instanceof ListableBeanFactory) {
// ... 这里代码执行的集合会为空
}
// 这里默认会为null,因为registrar是直接new并没有设置
if (this.registrar.getEndpointRegistry() == null) {
if (this.endpointRegistry == null) {
// 从容器中查找,在2.1中注册的RabbitListenerEndpointRegistry对象
this.endpointRegistry = this.beanFactory.getBean(
RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
RabbitListenerEndpointRegistry.class);
}
// 设置
this.registrar.setEndpointRegistry(this.endpointRegistry);
}
// 设置默认的容器工厂BeanName
if (this.defaultContainerFactoryBeanName != null) {
this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);
}
// ...
// 这里注册所有的监听程序
this.registrar.afterPropertiesSet();
// 清空缓存
this.typeCache.clear();
}
}
注册监听程序
public class RabbitListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
public void afterPropertiesSet() {
registerAllEndpoints();
}
protected void registerAllEndpoints() {
synchronized (this.endpointDescriptors) {
// 变量在2.5中构建的所有监听方法
for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
// 注册消息监听容器;注意这里resolveContainerFactory方法会获取容器中在2.1中创建的容器工厂
this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor));
}
// 触发立即启动
this.startImmediately = true;
}
}
private RabbitListenerContainerFactory<?> resolveContainerFactory(AmqpListenerEndpointDescriptor descriptor) {
if (descriptor.containerFactory != null) {
return descriptor.containerFactory;
} else if (this.containerFactory != null) {
return this.containerFactory;
} else if (this.containerFactoryBeanName != null) {
// 直接进入到此处获取容器在2.1中创建的SimpleRabbitListenerContainerFactory
this.containerFactory = this.beanFactory.getBean(this.containerFactoryBeanName, RabbitListenerContainerFactory.class);
return this.containerFactory ;
}
}
}
注册监听容器
public class RabbitListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware, ApplicationListener {
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {
registerListenerContainer(endpoint, factory, false);
}
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
boolean startImmediately) {
// 获取id;关于id值已经在2.4中有说明
String id = endpoint.getId();
synchronized (this.listenerContainers) {
// 根据在2.4中构造的消息监听端点创建消息监听容器
// 这里的factory = SimpleRabbitListenerContainerFactory ;在2.1中创建
// 下面的步骤创建消息监听容器
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
List containerGroup;
if (this.applicationContext.containsBean(endpoint.getGroup())) {
containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
} else {
containerGroup = new ArrayList();
this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
}
containerGroup.add(container);
}
if (this.contextRefreshed) {
container.lazyLoad();
}
if (startImmediately) {
startIfNecessary(container);
}
}
}
protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,
RabbitListenerContainerFactory<?> factory) {
// 通过SimpleRabbitListenerContainerFactory创建监听容器
// 调用父类AbstractRabbitListenerContainerFactory#createListenerContainer方法
// SimpleRabbitListenerContainerFactory在2.1中创建,并且设置了配置文件中配置的相关参数信息
MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
if (listenerContainer instanceof InitializingBean) {
try {
// 执行初始化方法
((InitializingBean) listenerContainer).afterPropertiesSet();
}
// ...
}
int containerPhase = listenerContainer.getPhase();
if (containerPhase < Integer.MAX_VALUE) { // a custom phase value
if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
// throw exception
}
this.phase = listenerContainer.getPhase();
}
return listenerContainer;
}
}
监听容器工厂父类创建监听容器
public abstract class AbstractRabbitListenerContainerFactory {
public C createListenerContainer(RabbitListenerEndpoint endpoint) {
// 调用子类SimpleRabbitListenerContainerFactory方法
// 如下创建的SimpleMessageListenerContainer对象
C instance = createContainerInstance();
JavaUtils javaUtils =
JavaUtils.INSTANCE
.acceptIfNotNull(this.connectionFactory, instance::setConnectionFactory)
.acceptIfNotNull(this.errorHandler, instance::setErrorHandler);
if (this.messageConverter != null && endpoint != null) {
endpoint.setMessageConverter(this.messageConverter);
}
javaUtils
.acceptIfNotNull(this.acknowledgeMode, instance::setAcknowledgeMode)
// ...
if (this.batchListener && this.deBatchingEnabled == null) {
// turn off container debatching by default for batch listeners
instance.setDeBatchingEnabled(false);
}
// 覆盖默认工厂的参数信息
if (endpoint != null) { // endpoint settings overriding default factory settings
javaUtils
.acceptIfNotNull(endpoint.getAutoStartup(), instance::setAutoStartup)
.acceptIfNotNull(endpoint.getTaskExecutor(), instance::setTaskExecutor)
.acceptIfNotNull(endpoint.getAckMode(), instance::setAcknowledgeMode);
javaUtils
.acceptIfNotNull(this.batchingStrategy, endpoint::setBatchingStrategy);
instance.setListenerId(endpoint.getId());
endpoint.setBatchListener(this.batchListener);
endpoint.setupListenerContainer(instance);
}
// ...
initializeContainer(instance, endpoint);
if (this.containerCustomizer != null) {
this.containerCustomizer.configure(instance);
}
return instance;
}
}
public class SimpleRabbitListenerContainerFactory {
protected SimpleMessageListenerContainer createContainerInstance() {
return new SimpleMessageListenerContainer();
}
protected void initializeContainer(SimpleMessageListenerContainer instance, RabbitListenerEndpoint endpoint) {
super.initializeContainer(instance, endpoint);
JavaUtils javaUtils = JavaUtils.INSTANCE
.acceptIfNotNull(this.batchSize, instance::setBatchSize);
String concurrency = null;
if (endpoint != null) {
concurrency = endpoint.getConcurrency();
javaUtils.acceptIfNotNull(concurrency, instance::setConcurrency);
}
javaUtils
.acceptIfCondition(concurrency == null && this.concurrentConsumers != null, this.concurrentConsumers,
instance::setConcurrentConsumers)
.acceptIfCondition((concurrency == null || !(concurrency.contains("-"))) && this.maxConcurrentConsumers != null,
this.maxConcurrentConsumers, instance::setMaxConcurrentConsumers)
.acceptIfNotNull(this.startConsumerMinInterval, instance::setStartConsumerMinInterval)
.acceptIfNotNull(this.stopConsumerMinInterval, instance::setStopConsumerMinInterval)
.acceptIfNotNull(this.consecutiveActiveTrigger, instance::setConsecutiveActiveTrigger)
.acceptIfNotNull(this.consecutiveIdleTrigger, instance::setConsecutiveIdleTrigger)
.acceptIfNotNull(this.receiveTimeout, instance::setReceiveTimeout);
if (Boolean.TRUE.equals(this.consumerBatchEnabled)) {
instance.setConsumerBatchEnabled(true);
instance.setDeBatchingEnabled(true);
}
}
}
到此消息监听容器MessageListenerContainer(SimpleMessageListenerContainer)对象创建完成,
到这里主要的消息监听容器都创建完成后接下来就是启动消息监听容器了。
在2.2中注册了RabbitListenerEndpointRegistry 对象,该类实现了SmartLifecycle接口,也实现了ApplicationListener接口,并且处理的是ContextRefreshedEvent事件。
上面这两个动作都会在容器上下文初始化完成以后触发,在AbstractApplicationContext#refresh#finishRefresh方法中触发
public abstract class AbstractApplicationContext {
protected void finishRefresh() {
// Initialize lifecycle processor for this context.
// 初始化生命周期处理器类,默认为:DefaultLifecycleProcessor
initLifecycleProcessor();
// Propagate refresh to lifecycle processor first.
// 处理生命周期方法
getLifecycleProcessor().onRefresh();
// Publish the final event.
// 发布事件
publishEvent(new ContextRefreshedEvent(this));
}
}
声明周期执行
public class DefaultLifecycleProcessor implements LifecycleProcessor, BeanFactoryAware {
public void onRefresh() {
startBeans(true);
this.running = true;
}
// 执行所有的SmartLifecycle#start方法
private void startBeans(boolean autoStartupOnly) {
Map lifecycleBeans = getLifecycleBeans();
Map phases = new TreeMap<>();
lifecycleBeans.forEach((beanName, bean) -> {
if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
int phase = getPhase(bean);
phases.computeIfAbsent(phase,
p -> new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly)
).add(beanName, bean);
}
});
if (!phases.isEmpty()) {
phases.values().forEach(LifecycleGroup::start);
}
}
}
开始消息监听
public class RabbitListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware,
ApplicationListener {
public void start() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
startIfNecessary(listenerContainer);
}
}
// listenerContainer = SimpleMessageListenerContainer
private void startIfNecessary(MessageListenerContainer listenerContainer) {
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
// 启动
// 调用父类的start方法
listenerContainer.start();
}
}
}
public abstract class AbstractMessageListenerContainer {
public void start() {
// ...
// 该方法在子类中实现
doStart();
// ...
}
}
public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer {
// 父类中的属性
private Executor taskExecutor = new SimpleAsyncTaskExecutor();
protected void doStart() {
int newConsumers = initializeConsumers();
synchronized (this.consumersMonitor) {
int newConsumers = initializeConsumers();
// ...
Set processors = new HashSet();
// 变量消费者
for (BlockingQueueConsumer consumer : this.consumers) {
// AsyncMessageProcessingConsumer 实现了Runable接口
// 核心消息的处理就在该Consumer中进行
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
// 获取线程池提交任务Runnable对象
// 线程池对象是系统默认的SimpleAsyncTaskExecutor
getTaskExecutor().execute(processor);
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
}
waitForConsumersToStart(processors);
}
}
protected int initializeConsumers() {
int count = 0;
synchronized (this.consumersMonitor) {
if (this.consumers == null) {
this.cancellationLock.reset();
// 根据并发数创建阻塞队列(有几个并发消费者,就创建几个阻塞队列消费者(内部使用的LinkedBlockingQueue队列))
this.consumers = new HashSet(this.concurrentConsumers);
for (int i = 0; i < this.concurrentConsumers; i++) {
// 创建阻塞队列消费者
BlockingQueueConsumer consumer = createBlockingQueueConsumer();
this.consumers.add(consumer);
count++;
}
}
}
return count;
}
protected BlockingQueueConsumer createBlockingQueueConsumer() {
BlockingQueueConsumer consumer;
String[] queues = getQueueNames();
// 实际就是配置的prefetch属性值,该值的取值0到65535,如果是0表示没有限制
// 该prefetch参数与Qos绑定(服务质量)channel.basicQos(this.prefetchCount,...)
int actualPrefetchCount = getPrefetchCount() > this.batchSize ? getPrefetchCount() : this.batchSize;
consumer = new BlockingQueueConsumer(getConnectionFactory(), getMessagePropertiesConverter(),
this.cancellationLock, getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount,
isDefaultRequeueRejected(), getConsumerArguments(), isNoLocal(), isExclusive(), queues);
consumer.setGlobalQos(isGlobalQos());
consumer.setMissingQueuePublisher(this::publishMissingQueueEvent);
if (this.declarationRetries != null) {
consumer.setDeclarationRetries(this.declarationRetries);
}
if (getFailedDeclarationRetryInterval() > 0) {
consumer.setFailedDeclarationRetryInterval(getFailedDeclarationRetryInterval());
}
if (this.retryDeclarationInterval != null) {
consumer.setRetryDeclarationInterval(this.retryDeclarationInterval);
}
ConsumerTagStrategy consumerTagStrategy = getConsumerTagStrategy();
if (consumerTagStrategy != null) {
consumer.setTagStrategy(consumerTagStrategy);
}
consumer.setBackOffExecution(getRecoveryBackOff().start());
consumer.setShutdownTimeout(getShutdownTimeout());
consumer.setApplicationEventPublisher(getApplicationEventPublisher());
return consumer;
}
}
异步消息处理消费者AsyncMessageProcessingConsumer
private final class AsyncMessageProcessingConsumer implements Runnable {
public void run() {
// ...
// 核心方法
try {
// 该方法中主要就是从Rabbit Broker上获取消息存入到消息队列中
// 1.首先是设置设置Qos该值对应的是配置中的prefetch, channel.basicQos(this.prefetchCount)
// 2.消费获取消息内部定义了InternalConsumer消费者重写了handleDelivery方法
// 在该方法中会将获取到的消息包装到Delivery对象中然后调用BlockingQueue#offer方法
// 存入队列中,offer如果存入成功返回true,失败返回false。
initialize();
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
// 从队列中不停的拿消息
mainLoop();
}
}
// ...
}
}
事件处理
public class RabbitListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware,
ApplicationListener {
private boolean contextRefreshed;
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext().equals(this.applicationContext)) {
// 当收到事件后设置为true表示上下文已经初始化完成了
this.contextRefreshed = true;
}
}
}
RabbitAutoConfiguration ===》RabbitAnnotationDrivenConfiguration ===》EnableRabbitConfiguration ===》 @EnableRabbit
注册RabbitListenerAnnotationBeanPostProcessor处理器处理@RabbitListener和@RabbitHandler注解
RabbitListenerAnnotationBeanPostProcessor类
将上一步解析出来的所有方法及对应的@RabbitListener注解中配置的信息进行包装到MethodRabbitListenerEndpoint中
说明:@RabbitListener注解中的errorHandler属性可以是SpEL表达式也可以是一个Bean的名称
该步骤中主要就是设置相关的一些属性信息到Endpoint中,比如:ackMode,queueName,concurrency等信息。
构造完Endpoint对象后将其保存到RabbitListenerEndpointRegistrar中。
RabbitListenerAnnotationBeanPostProcessor类实现了SmartInitializingSingleton接口,当所有的Bean初始化完成以后会执行实现了SmartInitializingSingleton接口Bean的回调方法afterSingletonsInstantiated。
在afterSingletonsInstantiated方法中调用RabbitListenerAnnotationBeanPostProcessor.registrar(RabbitListenerEndpointRegistrar)#afterPropertiesSet
方法。
在afterPropertiesSet方法中就是注册Endpoint了,在该方法中将所有的Endpoint再封装成MessageListenerContainer(SimpleMessageListenerContainer)
对象,最后将MessageListenerContainer对象保存到RabbitListenerEndpointRegistry.listenerContainers的Map集合中。
在这里是还没有启动所有的监听程序。
RabbitListenerEndpointRegistry对象Bean实现了SmartLifecycle接口,所以容器上下文执行完(刷新完)以后会调用实现了该接口的会滴方法start,启动消息监听。
SpringBoot多数据源配置详解
SpringBoot邮件发送示例
Springboot面试题整理附答案
SpringBoot配置文件你了解多少?
SpringBoot项目查看线上日志
springboot mybatis jpa 实现读写分离
Springboot整合openfeign使用详解
SpringBoot RabbitMQ消息可靠发送与接收
Springboot整合MyBatis复杂查询应用
Springboot整合RabbitMQ死信队列详解
页面更新:2024-05-14
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号