本文介绍了RocketMQ的文件存储原理,并且用MappedByteBuffer实现了一个简单的RocketMQ文件持久化和读取。本文适合希望进一部了解RocketMQ底层文件存储原理的开发者,学习本文需要对消息队列有一定的使用经验,对Java NIO文件读写有一定的了解。
主要内容:
1.RocketMQ文件简介
2.RocketMQ文件结构说明
3.MappedByteBuffer简介
4.最精简的RocketMQ文件存储实现(干货)
RocketMQ具有其强大的存储能力和强大的消息索引能力,从众多消息中间件产品中脱颖而出,其原理很值得学习。
RocketMQ存储用的是本地文件存储系统,效率高也可靠。存储文件主要分为CommitLog,ConsumeQueue,Index 三类文件。
CommitLog
消息存储文件,所有消息主题的消息都存储在 CommitLog 文件中。 CommitLog单个文件大小默认1G,文件文件名是起始偏移量,总共20位,左边补零,起始偏移量是0。
比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824
ConsumeQueue
消息消费的逻辑队列,作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。
IndexFile
消息索引文件,主要存储消息 Key 与 Offset 的对应关系。
消息消费队列是RocketMQ专门为消息订阅构建的索引文件,提高根据主题与消息队 列检索消息的速度
config文件夹
config 文件夹中 存储着Topic和Consumer等相关信息。主题和消费者群组相关的信息就存在在此。
topics.json : topic 配置属性
subscriptionGroup.json :消息消费组配置信息。
delayOffset.json :延时消息队列拉取进度。
consumerOffset.json :集群消费模式消息消进度。
consumerFilter.json :主题消息过滤信息。
几种文件的存储目录:
ConsumeQueue
ConsumeQueue 文件保存在 store 目录下的 consumequeue 目录中。
ConsumeQueue每条数据占20字节空间,包含三部分内容:消息的offset、消息大小size、tag的hashCode。单个ConsumeQueue文件最多保存30W条数据。
8byte (commitlogoffset) | 4byte (msgLength) | 8byte (tagCode) |
一个topic会分成多个逻辑队列,每个逻辑队列对应一个ConsumeQueue文件,根据topic和queueId来组织文件,如果TopicA有两个队列0,1,那么TopicA和QueueId=0组成一个ConsumeQueue,TopicA和QueueId=1组成另一个ConsumeQueue。见RocketMQ文件逻辑图:
CommitLog
消息存放的物理文件,每台broker上的commitlog被本机所有的queue共享,不做任何区分。
文件的默认位置如下,仍然可通过配置文件修改:
${user.home} store${commitlog}${fileName}
CommitLog的消息存储单元长度不固定,文件顺序写,随机读。消息的存储结构如下表所示,按照编号顺序以及编号对应的内容依次存储。
message1 | totalSize | queueId | queueOffset | PhysicalOffset | body | topic | 其它 |
message2 | totalSize | queueId | queueOffset | PhysicalOffset | body | topic | 其它 |
message3 | totalSize | queueId | queueOffset | PhysicalOffset | body | topic | 其它 |
字段说明:
单commitLog优点:对磁盘的访问串行化,避免磁盘竟争,不会因为队列增加导致IOWAIT增高。
缺点:写虽然完全是顺序写,但是读却变成了完全的随机读。读一条消息,会先读ConsumeQueue,再读CommitLog,增加了开销。要保证CommitLog与ConsumeQueue完全的一致,增加了编程的复杂度
Config的offsetTable.offset
和ConsumeQueue索引文件对应,这个offset是ConsumeQueue文件的(已经消费的)下标/行数,可以直接定位到ConsumeQueue并找到commitlogOffset从而找到消息体原文,
这个offset是消息消费进度的核心
{
"offsetTable":{
"zxp_test_topic@zxp_test_group2":{0:16,1:17,2:23,3:43
},
"TopicTest@please_rename_unique_group_name_4":{0:250,1:250,2:250,3:250
},
"%RETRY%zxp_test_group2@zxp_test_group2":{0:3
}
"order_topic@zxp_test_group3":{0:0,1:3,2:3,3:3
}
}
}
OffsetStore分为以下2种,分别存储在客户端和服务器端:
本地文件类型
BROADCASTING模式,各个Consumer没有互相干扰,使用LoclaFileOffsetStore,把Offset存储在Consumer本地,因为每条消息会被消费组内所有的消费者消费,同消费组的消费者相互独立,消费进度要单独存储,会以文本文件的形式存储在客户端,对应的数据结构为LocalFileOffsetStore
Broker代存储类型
在集群模式下,同一条消息只会被同一个消费组消费一次,消费进度会参与到负载均衡中,故消费进度是需要共享的,另外,消费者发生异常或重启为了保证可以从上一次消费的地方继续进行消费,这时的offset是统一保存到broker服务端的。对应的数据结构为RemoteBrokerOffsetStore。
IndexFile
用于为生成的索引文件提供访问服务,通过消息Key值查询消息真正的实体内容。在实际的物理存储上,文件名则是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引;
indexFile存放的位置:${rocketmq.home}/store/index/indexFile(年月日时分秒等组成文件名)
我们发送的消息体中,包含 Message Key 或 Unique Key ,那么就会给它们每一个都构建索引。
将当前 Index 条目的索引值,写在 Hash 槽 absSlotPos 位置上;将 Index 条目的具体信息 (hashcode/消息偏移量/时间差值/hash槽的值) ,从起始偏移量 absIndexPos 开始,顺序按字节写入。
由于出现了多个偏移量的概念,所以我总结一下:
以前我们操作大文件都是用BufferedInputStream、BufferedOutputStream等带缓冲的IO流处理,但是针对大文件读写性能不理想。
MappedByteBuffer是Java提供的基于操作系统虚拟内存映射(MMAP)技术的文件读写API,采用direct buffer的方式读写文件内容,底层不再通过read、write、seek等系统调用实现文件的读写,所以效率非常高。主要用于操作大文件,如上百M、上GB的大文件。RocketMQ使用MappedByteBuffer实现高性能的文件读写。
MMAP原理
一台服务器把本机磁盘文件的内容发送到客户端,一般分为两个步骤:
普通文件读写
这两个操作发生了两次系统调用,每次系统调用都得先从用户态切换到内核态,等内核完成任务后,再从内核态切换回用户态,也就是消息发送过程中一共发生了 4 次用户态与内核态的上下文切换。另外还发生了 4 次数据拷贝,其中两次是 DMA 的拷贝,另外两次则是通过 CPU 拷贝的,分别是:
mmap文件读写
系统调用函数在调用进程的虚拟地址空间中创建一个新映射。这个映射会直接把内核缓冲区里的数据映射到用户空间,这样就不用从内核空间到用户空间来回复制数据了。
应用进程调用 mmap(),DMA 把数据从磁盘拷贝到内核缓冲区里;
应用进程调用 write(),CPU直接将内核缓冲区的数据拷贝到 socket 缓冲区中;
DMA把数据从内核的 socket 缓冲区拷贝到网卡的缓冲区里。
通过上面的分析,我们可以发现,比起原始版本,mmap + write 的方式依然需要4 次用户态与内核态的上下文切换,但是少了一次内存拷贝。
代码示例:
public static void read() throws IOException {
try (RandomAccessFile file = new RandomAccessFile(new File("test.txt"), "r"))
{
//get Channel
FileChannel fileChannel = file.getChannel();
//get mappedByteBuffer from fileChannel
MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());
// check buffer
LOG.info("is Loaded in physical memory: {}",buffer.isLoaded()); //只是一个提醒而不是guarantee
LOG.info("capacity {}",buffer.capacity());
//read the buffer
for (int i = 0; i < buffer.limit(); i++)
{
LOG.info("get {}", buffer.get());
}
}
}
public static void writeWithMap() throws IOException {
try (RandomAccessFile file = new RandomAccessFile(new File("a.txt"), "rw"))
{
//get Channel
FileChannel fileChannel = file.getChannel();
//get mappedByteBuffer from fileChannel
MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096 * 8 );
// check buffer
LOG.info("is Loaded in physical memory: {}",buffer.isLoaded()); //只是一个提醒而不是guarantee
LOG.info("capacity {}",buffer.capacity());
//write the content
buffer.put("dhy".getBytes());
}
}
FileChannel的map方法有三个参数:
1.简单索引文件读写
模拟conusmQueue创建10个索引,长度固定20,保存到文件。
public class FileWrite {
public static void main(String[] args) throws IOException {
FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/111.txt")),
StandardOpenOption.WRITE, StandardOpenOption.READ);
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
fileChannel.close();
for(int i =0;i<10;i++){
mappedByteBuffer.position(i*20);
ByteBuffer b = ByteBuffer.allocate(20);
b.putLong(100);//8byte(commitlog offset)
b.putInt(1000);//4byte (msgLength)
b.putLong(20);//8byte (tagCode)
b.flip();
mappedByteBuffer.put(b);
}
mappedByteBuffer.force();
}
}
public class FileRead {
public static void main(String[] args) throws IOException {
FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/111.txt")),
StandardOpenOption.WRITE, StandardOpenOption.READ);
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
fileChannel.close();
for(int i =0;i<10;i++){
mappedByteBuffer.position(i*20);
long commitlogOffset = mappedByteBuffer.getLong();
long msgLen = mappedByteBuffer.getInt();
long tagCode = mappedByteBuffer.getLong();
System.out.println("文件读取:commitlogOffset:"+commitlogOffset+",msgLen:"+msgLen+",tagCode:"+tagCode);
}
}
}
运行结果:
2.基于consumeQueue和CommitLog的读写
手动创建100个消息体,存入commitLog,然后创建索引文件
public class CommitLogWriteTest {
private static Long commitLogOffset = 0L;//8byte(commitlog offset)
private static Long lastTotalSize = 0L;
public static void main(String[] args) throws IOException {
List list = createCommitLog();
createConsumerQueue(list);
}
private static List createCommitLog() throws IOException {
FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/commitLog.txt")),
StandardOpenOption.WRITE, StandardOpenOption.READ);
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
fileChannel.close();
List list = new ArrayList<>();
Random random = new Random();
int count = 0;
for (int i = 0; i < 100; i++) {
long commitLogOffset = lastTotalSize;
String topic = "Topic-test";
String msgId = UUID.randomUUID().toString();
String msgBody = "消息内容" + "msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsg".substring(0, random.nextInt(48));//
long queueOffset =i;//索引偏移量
String transactionId = UUID.randomUUID().toString();
/* 数据格式,位置固定
int totalSize;//消息长度
String msgId;
String topic;
long queueOffset;//索引偏移量
long bodySize;//消息长度
byte[] body;//消息内容
String transactionId;
long commitLogOffset;//从第一个文件开始算的偏移量
*/
int totalSize = 8 //totalSize长度
+ 64 //msgId长度
+ 64 //topic长度
+ 8 //索引偏移量长度
+ 8 //消息长度长度
+ msgBody.getBytes(StandardCharsets.UTF_8).length //消息内容长度
+ 64 //transactionId长度
+ 64 //commitLogOffset长度;
;
ByteBuffer b = ByteBuffer.allocate(totalSize);
// //如果3个消息长度分别是100,200,350,则偏移量分别是0,100,300
mappedByteBuffer.position(Integer.valueOf(commitLogOffset+""));
b.putLong(totalSize);//totalSize
b.put(getBytes(msgId, 64));//msgId
b.put(getBytes(topic, 64));//topic,定长64
b.putLong(queueOffset);//索引偏移量
b.putLong(msgBody.getBytes(StandardCharsets.UTF_8).length);//bodySize
b.put(msgBody.getBytes(StandardCharsets.UTF_8));//body
b.put(getBytes(transactionId, 64));
b.putLong(commitLogOffset);//bodySize
b.flip();
mappedByteBuffer.put(b);
allTotalSize = totalSize + allTotalSize;
System.out.println("写入消息,第:" + i + "次");
System.out.println("totalSize:" + totalSize);
System.out.println("msgId:" + msgId);
System.out.println("topic:" + topic);
System.out.println("msgBody:" + msgBody);
System.out.println("transactionId:" + transactionId);
System.out.println("commitLogOffset:" + commitLogOffset);
ConsumerQueueData consumerQueueData = new ConsumerQueueData();
consumerQueueData.setOffset(commitLogOffset);
consumerQueueData.setMsgLength(totalSize);
consumerQueueData.setTagCode(100L);
list.add(consumerQueueData);
count ++;
}
mappedByteBuffer.force();
System.out.println("commitLog数据保存完成,totalSize:" + count);
return list;
}
private static void createConsumerQueue(List list) throws IOException {
FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/consumerQueue.txt")),
StandardOpenOption.WRITE, StandardOpenOption.READ);
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
fileChannel.close();
int count = 0;
for (int i = 0; i < list.size(); i++) {
ConsumerQueueData consumerQueueData = list.get(i);
mappedByteBuffer.position(i * 20);
ByteBuffer b = ByteBuffer.allocate(20);
b.putLong(consumerQueueData.getOffset());//8byte(commitlog offset)
b.putInt(consumerQueueData.getMsgLength());//4byte (msgLength)
b.putLong(consumerQueueData.getTagCode());//8byte (tagCode)
b.flip();//很重要,使读指针从头开始
mappedByteBuffer.put(b);
count++;
System.out.println("createConsumerQueue:" + JSON.toJSONString(consumerQueueData));
}
System.out.println("ConsumerQueue数据保存完成count:" + count);
mappedByteBuffer.force();
}
//将变长字符串定长byte[],方便读取
private static byte[] getBytes(String s, int length) {
int fixLength = length - s.getBytes().length;
if (s.getBytes().length < length) {
byte[] S_bytes = new byte[length];
System.arraycopy(s.getBytes(), 0, S_bytes, 0, s.getBytes().length);
for (int x = length - fixLength; x < length; x++) {
S_bytes[x] = 0x00;
}
return S_bytes;
}
return s.getBytes(StandardCharsets.UTF_8);
}
}
运行结果:(数据有100条,没展示全部)
读取索引文件,然后根据偏移量在commitLog文件中读取消息
public class CommitLogReadTest {
static FileChannel commitLogfileChannel = null;
public static void main(String[] args) throws IOException {
FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/consumerQueue.txt")),
StandardOpenOption.WRITE, StandardOpenOption.READ);
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
fileChannel.close();
int index = 0 ;
for(int i =index;i<100;i++){
//根据索引下标读取索引,实际情况是用户消费的最新点位,存在在broker的偏移量文件中
mappedByteBuffer.position(i*20);
long commitlogOffset = mappedByteBuffer.getLong();
// System.out.println(commitlogOffset);
long msgLen = mappedByteBuffer.getInt();
Long tag = mappedByteBuffer.getLong();
//System.out.println("======读取到consumerQueue,commitlogOffset:"+commitlogOffset+",msgLen :"+msgLen+"===");
//根据偏移量读取CcommitLog
readCommitLog(Integer.valueOf(commitlogOffset+""));
}
}
public static MappedByteBuffer initFileChannel() throws IOException {
MappedByteBuffer mappedByteBuffer = null;
if(mappedByteBuffer == null){
commitLogfileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/commitLog.txt")),
StandardOpenOption.WRITE, StandardOpenOption.READ);
mappedByteBuffer = commitLogfileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
commitLogfileChannel.close();
}
return mappedByteBuffer;
}
/*
*
* 根据偏移量读取CommitLog
* */
public static void readCommitLog(int offset) throws IOException {
/*写入顺序,读的时候也按这个顺序读取
b.putLong(totalSize);//totalSize
b.put(getBytes(msgId, 64));//msgId
b.put(getBytes(topic, 64));//topic,定长64
b.putLong(queueOffset);//索引偏移量
b.putLong(msgBody.getBytes(StandardCharsets.UTF_8).length);//bodySize
b.put(msgBody.getBytes(StandardCharsets.UTF_8));//body
b.put(getBytes(transactionId, 64));
b.putLong(commitLogOffset);//commitLogOffset
*/
System.out.println("=================commitlog读取偏移量为"+offset+"的消息===================");
MappedByteBuffer mappedByteBuffer = initFileChannel();
//很重要,按偏移量读取文件,入参为索引文件记录的偏移量
mappedByteBuffer.position(offset);
long totalSize = mappedByteBuffer.getLong();//消息长度
byte[] msgIdByte = new byte[64];//uuid 固定是64
mappedByteBuffer.get(msgIdByte);
byte[] topicByte = new byte[64];// 固定是64
mappedByteBuffer.get(topicByte);
long queueOffset = mappedByteBuffer.getLong();
Long bodySize = mappedByteBuffer.getLong();
byte[] bodyByte = new byte[Integer.parseInt(bodySize+"")];//bodySize 长度不固定
mappedByteBuffer.get(bodyByte);
byte[] transactionIdByte = new byte[64];//uuid 固定是64
mappedByteBuffer.get(transactionIdByte);
long commitLogOffset = mappedByteBuffer.getLong();//偏移量
System.out.println("totalSize:"+totalSize);
System.out.println("msgId:"+new String(msgIdByte));
System.out.println("topic:"+new String(topicByte));
System.out.println("queueOffset:"+queueOffset);
System.out.println("bodySize:"+bodySize);
System.out.println("body:"+new String(bodyByte));
System.out.println("transactionId:"+new String(transactionIdByte));
System.out.println("commitLogOffset:"+commitLogOffset);
}
}
运行结果:(数据有100条,没展示全部)
总结:
本文介绍了RocketMQ的文件存储基本原理,并基于Java NIO的MappedByteBuffer实现了对RocketMQ的存储文件CommotLog,索引文件ConsumeQueue的写入,以及按索引下标读取CommotLog的,希望能加深大家对RocketMQ文件存储的理解。
页面更新:2024-04-29
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号