引言:什么是消息列队。
消息队列是一种应用间的异步协作机制,同时消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削峰等问题。实现高性能,高可用,可伸缩和最终一致性架构。市面上的 MQ应用有很多(例如:Kafka,RabbitMQ),同时也可以基于 Redis 来实现,常用的方式有:(IT枫斗者怎么样)
(1)List结构:基于List结构模拟消息队列
(2)PubSub:基本的点对点消息模型
(3)Stream:比较完善的消息队列模型
1,基于List结构模拟消息列队
基于List的队列,很简单,就是使用LPUSH 结合 RPOP的组合来实现队列的效果。之前学习List类型结构时就已经学过了。
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。(IT枫斗者怎么样)
2,基于PubSub的消息队列
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel(频道),生产者向对应channel发送消息后,所有订阅者都能收到相关消息。(IT枫斗者怎么样)
它的特点是,不同的消费者可以订阅不同的频道,支持多个消费者进行消费。
语法:
SUBSCRIBE channel [channel] :订阅一个或多个频道
PUBLISH channel msg :向一个频道发送消息
PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
如图所示,上面是生产者,下面是消费者:
这种方式的优点在于可以实现多消费,但同样也有很多缺陷。
包括:不支持数据持久化 、无法避免消息丢失(只能读取到最新消息)、消息堆积有上限,超出时数据丢失(因为是存储在内存里)(IT枫斗者怎么样)
3,基于Stream的消息队列
我们前面说了,Streams是一种redis专门为消息队列定义的一种数据结构,所以自然的我们是先要看如何定义这种数据结构了,和其它的数据结构一样,我们不需要显式的创建,在执行第一次数据添加的时候自动创建,添加数据的命令是XADD,语法格式是XADD key ID field value [field value ...],参数说明如下:
如下生产(创建)若干条消息:
其中XLEN用来查看消息的个数,XRANGE用来通过范围查询基于递增ID获取消息,-相当于是负无穷,+相当于是正无穷,即获取所有消息。我们接着再来看下其他一些命令。(IT枫斗者怎么样)
3.1:XDEL
根据ID删除消息,测试如下:
3.2:XLEN
获取消息的数量,语法格式xlen key,如下:
3.3:XRANGE
查询指定范围的消息,语法格式XRANGE key start end [COUNT count],解释如下:
测试如下:
3.4:XREVRANGE
实例如下:
3.5:XREAD
以阻塞或者是非阻塞的方式获取消息,即消费消息的命令,语法格式XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...],解释如下:
测试如下:
3.6:XGROUP CREATE
创建消费者组,使用消费者可以对消息进行并发的消费,解决消费者消费能力不足的问题,语法格式为XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername],解释如下:
在实际的场景中我们可以通过设置多个消费者组的不同开始消费的位置来实现并发消费的效果,此时可能如下图:
图中主要元素解释如下:
3.7:XREADGROUP GROUP
读取消费者组中的消息,语法格式如下:
最后将三种方式进行个对比:
页面更新:2024-03-14
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号