RabbitMQ相关概念及代码示例

这篇文章主要介绍RabbitMQ中几个重要的概念,可以看作是AMQP协议的实现篇,因为这里包含了部分代码实例。

对于初学者来说,概念性的东西可能比较难以理解,但是对于理解和使用RabbitMQ却必不可少,初学阶段,现在脑海里留有印象,随着后续更加深入的学习,就会很容易理解。对于消息队列的高手,这篇文章如有阐述不到位的,可以一起交流。

RabbitMQ整体上是一个生产者和消费者模型,如下图。

生产者和消费者

RabbitMQ的Producer和其他消息队列的Producer没有什么不同,都是用来将消息发送到服务器,只是在实现上有所区别,关于RabbitMQ客户端的实现,包括API和网络模型等,后面会专门有文章介绍。

RabbitMQ Consumer连接Broker,并订阅它关注的队列,只要队列上有消息,Consumer就会接收到消息并开始消费消息。RabbitMQ的消费端默认是推模型。

有Producer、Consumer,那么就会有一个地方来存储、转发消息,RabbitMQ Broker完成这项工作,在这里,可以先简单的把一个Broker理解为一个RabbitMQ 节点或实例。.

队列

Queue是RabbitMQ的内部对象,是实际存储Producer发送的消息的地方,这点和Kafka存储消息的模型不一样。

Producer发送消息并不是直接发送到Queue,而是在发送消息的请求中声明Exchange(交换器) 和 RoutingKey(绑定键),Broker会根据Exchange 和 RoutingKey找到相应的Queue,并保存消息内容到Queue。

Consumer订阅的是Queue,所有直接从Queue上消费消息。RabbitMQ支持多个Consumer同时订阅一个Queue,这时Broker会轮询Consumer,把Queue中的消息均摊到所有订阅次Queue的Consumer。但是RabbitMQ不支持队列层面的广播消费。

交换器、路由键、绑定

下面介绍的是RabbitMQ中非常重要的概念,生产和消费消息都是以此为基础,也是对AMQP协议的具体实现。

交换器(Exchange)负责按照一定的规则分发消息,Producer发送的消息实际上是到Exchange,由Exchange将消息路由到一个或多个Queue,如果找不到Queue,则根据客户端配置,要么返回给Producer,要么直接丢弃。

路由键(RoutingKey)指定了路由规则,在Producer发送消息的时候,一般会指定RoutingKey,这样就知道消息需要路由到哪里。

绑定(BingKey)将Exchange和Queue关联起来,这样,RabbitMQ就知道如何正确的将消息路由到队列了。

关于路由键和绑定键,对于初学者可能有点混淆,这里分享下我的理解:路由键是在客户端发送消息的时候,告诉服务器,我发送的消息根据我指定的路由键去找队列;而绑定键,是在创建的时候使用的,告诉服务器,交换器是如何与队列关联的。具体可以对照下面的代码示例来理解可能容易点。

交换器类型

RabbitMQ提供了多个交换器类型来满足不同的需求:fanout、direct、topic、headers。

fanout

fanout exchange会把消息发送到所有与该交换器绑定的队列中,会忽略Procuder发送消息时申明的RoutingKey。如下图,Producer发送message给fanout_exchange,并制定了routing key 为 info,最终queue1和queue2都收到了这条消息。



    public void testFanoutExchange() throws IOException, TimeoutException {
        // fanout类型的 exchange, 在server端忽略 routing key,只要发送到 exchange,任何和这个 exchange 绑定的 queue都会收到这条消息
 
        String exchangeName = "fanout_exchange";
        String queue1 = "queue1";
        String queue2 = "queue2";
        String warning = "warning";
        String info = "info";
 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
 
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
 
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, null);
 
        channel.queueDeclare(queue1, true, false, false, null);
        channel.queueDeclare(queue2, true, false, false, null);
 
        channel.queueBind(queue1, exchangeName, warning);
        channel.queueBind(queue2, exchangeName, warning);
        channel.queueBind(queue2, exchangeName, info);
 
        String message = "info";
        channel.basicPublish(exchangeName, info, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
 
        channel.close();
        connection.close();
 
    }

direct

direct exchange也是一种比较简单的exchange,在路由的时候,只有routing key 和binding key完全匹配的时候,才会路由到queue。如下图,Producer发送消息到direct_exchange,routing_key 是info,只有queue2才会收到消息。



    public void testDirectExchange() throws IOException, TimeoutException {
        String exchangeName = "direct_exchange";
        String queue1 = "direct_queue1";
        String queue2 = "direct_queue2";
        String warning = "warning";
        String info = "info";
 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
 
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, null);
        channel.queueDeclare(queue1, true, false, false, null);
        channel.queueDeclare(queue2, true, false, false, null);
 
        channel.queueBind(queue1, exchangeName, warning);
        channel.queueBind(queue2, exchangeName, warning);
        channel.queueBind(queue2, exchangeName, info);
 
        String message = "direct exchange";
        channel.basicPublish(exchangeName, info, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
 
        channel.close();
        connection.close();
 
    }

topic

topic exchange 是比较灵活,实际项目中用的比较多的一种,它也是将消息路由到 routing key 和 binding key 匹配的队列中,匹配规则如下:

下图中topic_exchange绑定了两个queue,*.rabbitmq.* 绑定颅queue1, *.*.client 、 com.# 都绑定了queue2,当binding key 为 com.rabbitmq.client,匹配queue1和queue2,因此都会收到消息;当binding key 为 org.rabbitmq.server 时,只有queue1匹配,当 binding key 为 com.hidden.demo 时,只有queue2匹配,当bingding key 为 aaa 时,queue1 和 queue2 都不匹配。



    public void testTopicExchange() throws IOException, TimeoutException {
        // * 匹配一个单词,# 匹配0个或多个单词
        String exchangeName = "topic_exchange";
        String queue1 = "topic_queue1";
        String queue2 = "topic_queue2";
        String routing1 = "*.rabbitmq.*";
        String routing2 = "*.*.client";
        String routing3 = "com.#";
 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, null);
        channel.queueDeclare(queue1, true, false, false, null);
        channel.queueDeclare(queue2, true, false, false, null);
 
        channel.queueBind(queue1, exchangeName, routing1);
        channel.queueBind(queue2, exchangeName, routing2);
        channel.queueBind(queue2, exchangeName, routing3);
 
        String message = "topic exchange";
        // queue1, queue2
        channel.basicPublish(exchangeName, "com.rabbitmq.client", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        // queue1
        channel.basicPublish(exchangeName, "org.rabbitmq.server", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        // queue2
        channel.basicPublish(exchangeName, "com.hidden.demo", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        // NONE
        channel.basicPublish(exchangeName, "aaaa", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    }

headers

headers exchange 不依赖路由键的匹配规则来路由消息,而是根据消息的headers属性值,RabbitMQ收到消息后,对比消息的headers中的属性值是否与queue、exchange绑定时指定的属性值一致,如果完全匹配,则路由消息到队列。由于headers exchange 性能很差,所以这里不做代码演示,感兴趣的小伙伴可以实验一下。

好了,以上就是对RabbitMQ中的一些概念做了一下介绍,小伙伴们可以多做实验加深理解,我也是在写了几个UnitTest之后,才理解这些概念,尤其是Exchange、RoutingKey 和 BindingKey。

RabbitMQ系列文章会陆续更新,欢迎各位小伙伴关注后面的技术分享。

展开阅读全文

页面更新:2024-03-20

标签:概念   都会   生产者   队列   示例   路由   绑定   客户端   模型   规则   消息   代码

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top