自己动手实现一个RocketMQ文件存储系统

前言

RocketMQ作为一款优秀的开源消息中间件,很多java开发者都在使用并研究里面的源码。目前网上有很多关于RocketMQ源代码的文章,但是很多文章只是从框架开发者的的角度分析源码,没有从技术实现本质进行剖析。因此很多源码学习者在读完后还是一知半解,当自己想动手写的时候不知要用到哪种技术,无从着手。笔者基于对RocketMQ的文件存储研究,结合开发者常见的技术,自己动手实现了一个简化版本的RocketMQ文件系统,希望能抽丝剥茧,帮助开发者从本质上理解RocketMQ文件存储的原理,起到抛砖引玉,举一反三的作用。

RocketMQ逻辑存储结构


本文适合对RocketMQ的文件存储原理有一定的了解,熟悉java NIO,希望了解RocketMQ是如何通过java NIO实现的读者。以下代码部分:


1.手动生成10个消息,并创建commitLog文件,consumeQueue,indexFile文件

package org.apache.rocketmq.test.smoke;

import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.test.smoke.model.ConsumerQueueData;
import org.apache.rocketmq.test.smoke.model.IndexFileHeaderData;
import org.apache.rocketmq.test.smoke.model.IndexFileItemData;

import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;

public class CommitLogWriteTest {
    private static Long commitLogOffset = 0L;//8byte(commitlog offset)
    private static Long lastTotalSize = 0L;
    private static Long currentTotalSize = 0L;
    private static List consumerQueueDatas = new ArrayList<>();
    private static List indexFileItemDatas = new ArrayList<>();
    private static int MESSAGE_COUNT = 10;

    public static void main(String[] args) throws IOException {
        createCommitLog();
        createConsumerQueue();
        createIndexFile();
    }

    private static void createCommitLog() throws IOException {
        FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/commitLog.txt")),
                StandardOpenOption.WRITE, StandardOpenOption.READ);
        MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
        fileChannel.close();
        Random random = new Random();

        int count = 0;
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            long commitLogOffset = lastTotalSize;

            String topic = "Topic-test";
            String msgId = UUID.randomUUID().toString();
            String msgBody = "消息内容" + "msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsg".substring(0, random.nextInt(48));//
            long queueOffset = i;//索引偏移量
            String transactionId = UUID.randomUUID().toString();


         /* 数据格式,位置固定
         int totalSize;//消息长度
         String msgId;
         String topic;
         long queueOffset;//索引偏移量
         long bodySize;//消息长度
         byte[] body;//消息内容
         String transactionId;
         long commitLogOffset;//从第一个文件开始算的偏移量

         */

            int totalSize = 8 //totalSize长度
                    + 64  //msgId长度
                    + 64 //topic长度
                    + 8 //索引偏移量长度
                    + 8 //消息长度长度
                    + msgBody.getBytes(StandardCharsets.UTF_8).length //消息内容长度
                    + 64  //transactionId长度
                    + 64  //commitLogOffset长度;
                    ;

            ByteBuffer b = ByteBuffer.allocate(totalSize);
            // //如果3个消息长度分别是100,200,350,则偏移量分别是0,100,300
            mappedByteBuffer.position(Integer.valueOf(commitLogOffset + ""));

            b.putLong(totalSize);//totalSize
            b.put(getBytes(msgId, 64));//msgId
            b.put(getBytes(topic, 64));//topic,定长64
            b.putLong(queueOffset);//索引偏移量
            b.putLong(msgBody.getBytes(StandardCharsets.UTF_8).length);//bodySize
            b.put(msgBody.getBytes(StandardCharsets.UTF_8));//body
            b.put(getBytes(transactionId, 64));
            b.putLong(commitLogOffset);//bodySize
            b.flip();
            mappedByteBuffer.put(b);

            lastTotalSize = totalSize + lastTotalSize;

            System.out.println("写入消息,第:" + i + "次");

            System.out.println("totalSize:" + totalSize);
            System.out.println("msgId:" + msgId);
            System.out.println("topic:" + topic);
            System.out.println("msgBody:" + msgBody);
            System.out.println("transactionId:" + transactionId);
            System.out.println("commitLogOffset:" + commitLogOffset);

            ConsumerQueueData consumerQueueData = new ConsumerQueueData();
            consumerQueueData.setOffset(commitLogOffset);
            consumerQueueData.setMsgLength(totalSize);
            consumerQueueData.setTagCode(100L);
            //准备生成consumeQueue文件
            consumerQueueDatas.add(consumerQueueData);

            IndexFileItemData indexFileItemData = new IndexFileItemData();
            indexFileItemData.setKeyHash(msgId.hashCode());
            indexFileItemData.setMessageId(msgId);
            indexFileItemData.setPhyOffset(commitLogOffset);
            //准备生成indexFile文件
            indexFileItemDatas.add(indexFileItemData);
            mappedByteBuffer.force();
            count++;
        }

        System.out.println("commitLog数据保存完成,totalSize:" + count);

    }


    public static void createConsumerQueue() throws IOException {
        FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/consumerQueue.txt")),
                StandardOpenOption.WRITE, StandardOpenOption.READ);
        MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
        fileChannel.close();
        int count = 0;
        for (int i = 0; i < consumerQueueDatas.size(); i++) {
            ConsumerQueueData consumerQueueData = consumerQueueDatas.get(i);
            //指定写入位置
            mappedByteBuffer.position(i * 20);
            mappedByteBuffer.putLong(consumerQueueData.getOffset());//8byte(commitlog offset)
            mappedByteBuffer.putInt(consumerQueueData.getMsgLength());//4byte (msgLength)
            mappedByteBuffer.putLong(consumerQueueData.getTagCode());//8byte (tagCode)

            count++;
            System.out.println("consumerQueue数据写入完成:" + JSON.toJSONString(consumerQueueData));
            mappedByteBuffer.force();

        }
        System.out.println("ConsumerQueue数据保存完成count:" + count);

    }



    public static void createIndexFile() throws IOException {
        //文件场创建时间,在写第一条消息的时候创建
        FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/index.txt")),
                StandardOpenOption.WRITE, StandardOpenOption.READ);
        MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
        ByteBuffer headerByteBuffer = mappedByteBuffer.slice();
        long firstDataTime = System.currentTimeMillis();

        fileChannel.close();

        //开始写hash槽,从头部后写入
        /*  已经填充有index的slot数量
          (并不是每个slot槽下都挂载有index索引单元,这 里统计的是所有挂载了index索引单元的slot槽的数量,hash冲突)*/
        int hashSlotCount = 0;

        /* 已该indexFile中包含的索引单元个数(统计出当前indexFile中所有slot槽下挂载的所有index索引单元的数量之和),
        如果没有hash冲突,hashSlotCount = indexCount*/
        int indexCount = 0;
         //假设建立100个槽位(总长度400)
        int soltNum = 100;

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            IndexFileItemData indexFileItemData = indexFileItemDatas.get(i);
            int keyHash = indexFileItemData.getKeyHash();

            //取模,计算第几个槽位
            int slotPos = keyHash % 100 > 0?keyHash % 100:-1*(keyHash % 100);

            // slot存放第几条数据的的位置(字节数组位置)
            int absSlotPos = 40 + slotPos * 4;

            // 存储实际数据的位置(字节数组位置)
            int absIndexPos =
                    40 + soltNum * 4
                            + indexCount * 20;



            //将hash槽的值设置为indexCount,建立索引,即第n条消息保存在XX位置
            mappedByteBuffer.putInt(absSlotPos, indexCount);

            //写入数据
            mappedByteBuffer.putInt(absIndexPos,indexFileItemData.getKeyHash());//4byte msg hashcode
            mappedByteBuffer.putLong(absIndexPos+4,indexFileItemData.getPhyOffset());//8byte pyhoffset
            mappedByteBuffer.putInt(absIndexPos+4+8,Integer.valueOf((System.currentTimeMillis()- firstDataTime)+""));//4byte (timeDiff)
            mappedByteBuffer.putInt(absIndexPos+4+8+4,0);//4byte (preIndex),暂置0,暂不考虑hash冲突的情况


            //模拟最后一个文件,写入header
            if (i == 0) {
                //该indexFile中第一条消息的存储时间
                headerByteBuffer.putLong(0, firstDataTime);
                //该indexFile种第一条消息在commitlog种的偏移量commitlog offset
                mappedByteBuffer.putLong(16, indexFileItemData.getPhyOffset());
            }
            //模拟最后一个文件,写入header
            if (i == 99) {
                //该indexFile种最后一条消息存储时间
                headerByteBuffer.putLong(8, System.currentTimeMillis());
                //该indexFile中最后一条消息在commitlog中的偏移量commitlog offset
                headerByteBuffer.putLong(24, indexFileItemData.getPhyOffset());
            }
            //已经填充有index的slot数量
            headerByteBuffer.putInt(32, hashSlotCount+1);
            //该indexFile中包含的索引单元个数
            headerByteBuffer.putInt(36, indexCount+1);
            mappedByteBuffer.force();
            System.out.println("msgId:"+indexFileItemData.getMessageId()+",keyHash:"+keyHash+",保存槽位为"+slotPos+"的数据,absSlotPos="+absSlotPos+",值index="+indexCount+",绝对位置:"+absIndexPos+",commit-phyOffset:"+indexFileItemData.getPhyOffset());

            indexCount ++;
            hashSlotCount++;

        }

    }



    //将变长字符串定长byte[],方便读取
    private static byte[] getBytes(String s, int length) {
        int fixLength = length - s.getBytes().length;
        if (s.getBytes().length < length) {
            byte[] S_bytes = new byte[length];
            System.arraycopy(s.getBytes(), 0, S_bytes, 0, s.getBytes().length);
            for (int x = length - fixLength; x < length; x++) {
                S_bytes[x] = 0x00;
            }
            return S_bytes;
        }
        return s.getBytes(StandardCharsets.UTF_8);
    }

}

运行结果:

写入消息,第:0次
totalSize:322
msgId:61b5c500-f7f5-4bfe-beb1-50a8148534c0
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsg
transactionId:9453ba39-3982-40e9-926d-47b51d360590
commitLogOffset:0
写入消息,第:1次
totalSize:306
msgId:d0fbf80f-223b-4721-a43e-518b152decc2
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgms
transactionId:e2ef1652-58fa-4849-bf74-885c7e5db9e3
commitLogOffset:322
写入消息,第:2次
totalSize:307
msgId:199053e3-e616-4611-ab0d-e5c7af4549a9
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgmsg
transactionId:33d21abe-0d8e-4c0e-9c78-f415daefd767
commitLogOffset:628
写入消息,第:3次
totalSize:339
msgId:8e799d8e-3290-4f6b-ab5d-289153446994
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgms
transactionId:98b46b96-cc88-4969-a56f-282d25799085
commitLogOffset:935
写入消息,第:4次
totalSize:320
msgId:8b78474f-b28a-4442-99a0-6f7883f0302b
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:5c0ff6fe-aea3-40c3-8647-6f4bdd797a78
commitLogOffset:1274
写入消息,第:5次
totalSize:312
msgId:b33c6f31-cc96-462b-b095-99410459082c
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgmsgmsgms
transactionId:57420047-2539-43fa-a3f2-b2f55c7b059c
commitLogOffset:1594
写入消息,第:6次
totalSize:324
msgId:d0a6803f-8555-418e-988a-b3b9a70d14f0
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgms
transactionId:29601335-3fcd-4193-b14f-140bbaf409a4
commitLogOffset:1906
写入消息,第:7次
totalSize:293
msgId:91151ec5-e76b-4560-90b7-ab77f9d04c9a
topic:Topic-test
msgBody:消息内容m
transactionId:291e54de-2ebe-41b1-b974-e81a2e9f1370
commitLogOffset:2230
写入消息,第:8次
totalSize:323
msgId:eb21df35-b4dc-43aa-8604-e9a103f25a7b
topic:Topic-test
msgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:3005a39d-b8cb-4138-ae05-34b65fc135a2
commitLogOffset:2523
写入消息,第:9次
totalSize:296
msgId:abcda364-8fc3-4d18-ae5d-1d7a8ffd0929
topic:Topic-test
msgBody:消息内容msgm
transactionId:d42733b5-3911-4f0a-b1db-11eb45a30345
commitLogOffset:2846
commitLog数据保存完成,totalSize:10

创建consumerQueue文件开始
consumerQueue数据写入完成:{"msgLength":322,"offset":0,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":306,"offset":322,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":307,"offset":628,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":339,"offset":935,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":320,"offset":1274,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":312,"offset":1594,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":324,"offset":1906,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":293,"offset":2230,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":323,"offset":2523,"tagCode":100}
consumerQueue数据写入完成:{"msgLength":296,"offset":2846,"tagCode":100}
ConsumerQueue数据保存完成count:10

创建索引文件开始
msgId:61b5c500-f7f5-4bfe-beb1-50a8148534c0,keyHash:249765627,保存槽位为27的数据,absSlotPos=148,值index=0,绝对位置:440,commit-phyOffset:0
msgId:d0fbf80f-223b-4721-a43e-518b152decc2,keyHash:1587335015,保存槽位为15的数据,absSlotPos=100,值index=1,绝对位置:460,commit-phyOffset:322
msgId:199053e3-e616-4611-ab0d-e5c7af4549a9,keyHash:791210473,保存槽位为73的数据,absSlotPos=332,值index=2,绝对位置:480,commit-phyOffset:628
msgId:8e799d8e-3290-4f6b-ab5d-289153446994,keyHash:1460275929,保存槽位为29的数据,absSlotPos=156,值index=3,绝对位置:500,commit-phyOffset:935
msgId:8b78474f-b28a-4442-99a0-6f7883f0302b,keyHash:1174005465,保存槽位为65的数据,absSlotPos=300,值index=4,绝对位置:520,commit-phyOffset:1274
msgId:b33c6f31-cc96-462b-b095-99410459082c,keyHash:-1695757800,保存槽位为0的数据,absSlotPos=40,值index=5,绝对位置:540,commit-phyOffset:1594
msgId:d0a6803f-8555-418e-988a-b3b9a70d14f0,keyHash:1334295408,保存槽位为8的数据,absSlotPos=72,值index=6,绝对位置:560,commit-phyOffset:1906
msgId:91151ec5-e76b-4560-90b7-ab77f9d04c9a,keyHash:1287318090,保存槽位为90的数据,absSlotPos=400,值index=7,绝对位置:580,commit-phyOffset:2230
msgId:eb21df35-b4dc-43aa-8604-e9a103f25a7b,keyHash:239865974,保存槽位为74的数据,absSlotPos=336,值index=8,绝对位置:600,commit-phyOffset:2523
msgId:abcda364-8fc3-4d18-ae5d-1d7a8ffd0929,keyHash:-1173357775,保存槽位为75的数据,absSlotPos=340,值index=9,绝对位置:620,commit-phyOffset:2846


2.读取consumeQueue文件,并根据offset从commitLog读取一条完整的消息

package org.apache.rocketmq.test.smoke;

import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

public class ConsumeQueueMessageReadTest {

   public static MappedByteBuffer mappedByteBuffer = null;
    private static int MESSAGE_COUNT = 10;

    public static void main(String[] args) throws IOException {
        FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/consumerQueue.txt")),
                StandardOpenOption.WRITE, StandardOpenOption.READ);
        MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
        fileChannel.close();
        mappedByteBuffer.position(0);
        //根据索引下标读取索引,实际情况是用户消费的最新点位,存在在broker的偏移量文件中
        int index = 0 ;
        for(int i =index;i

运行结果:

=================commitlog读取偏移量为0的消息===================
totalSize:322
msgId:61b5c500-f7f5-4bfe-beb1-50a8148534c0                            
topic:Topic-test                                                      
queueOffset:0
bodySize:42
body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsg
transactionId:9453ba39-3982-40e9-926d-47b51d360590                            
commitLogOffset:0
=================commitlog读取偏移量为322的消息===================
totalSize:306
msgId:d0fbf80f-223b-4721-a43e-518b152decc2                            
topic:Topic-test                                                      
queueOffset:1
bodySize:26
body:消息内容msgmsgmsgmsgms
transactionId:e2ef1652-58fa-4849-bf74-885c7e5db9e3                            
commitLogOffset:322
=================commitlog读取偏移量为628的消息===================
totalSize:307
msgId:199053e3-e616-4611-ab0d-e5c7af4549a9                            
topic:Topic-test                                                      
queueOffset:2
bodySize:27
body:消息内容msgmsgmsgmsgmsg
transactionId:33d21abe-0d8e-4c0e-9c78-f415daefd767                            
commitLogOffset:628
=================commitlog读取偏移量为935的消息===================
totalSize:339
msgId:8e799d8e-3290-4f6b-ab5d-289153446994                            
topic:Topic-test                                                      
queueOffset:3
bodySize:59
body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgms
transactionId:98b46b96-cc88-4969-a56f-282d25799085                            
commitLogOffset:935
=================commitlog读取偏移量为1274的消息===================
totalSize:320
msgId:8b78474f-b28a-4442-99a0-6f7883f0302b                            
topic:Topic-test                                                      
queueOffset:4
bodySize:40
body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:5c0ff6fe-aea3-40c3-8647-6f4bdd797a78                            
commitLogOffset:1274
=================commitlog读取偏移量为1594的消息===================
totalSize:312
msgId:b33c6f31-cc96-462b-b095-99410459082c                            
topic:Topic-test                                                      
queueOffset:5
bodySize:32
body:消息内容msgmsgmsgmsgmsgmsgms
transactionId:57420047-2539-43fa-a3f2-b2f55c7b059c                            
commitLogOffset:1594
=================commitlog读取偏移量为1906的消息===================
totalSize:324
msgId:d0a6803f-8555-418e-988a-b3b9a70d14f0                            
topic:Topic-test                                                      
queueOffset:6
bodySize:44
body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgms
transactionId:29601335-3fcd-4193-b14f-140bbaf409a4                            
commitLogOffset:1906
=================commitlog读取偏移量为2230的消息===================
totalSize:293
msgId:91151ec5-e76b-4560-90b7-ab77f9d04c9a                            
topic:Topic-test                                                      
queueOffset:7
bodySize:13
body:消息内容m
transactionId:291e54de-2ebe-41b1-b974-e81a2e9f1370                            
commitLogOffset:2230
=================commitlog读取偏移量为2523的消息===================
totalSize:323
msgId:eb21df35-b4dc-43aa-8604-e9a103f25a7b                            
topic:Topic-test                                                      
queueOffset:8
bodySize:43
body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:3005a39d-b8cb-4138-ae05-34b65fc135a2                            
commitLogOffset:2523
=================commitlog读取偏移量为2846的消息===================
totalSize:296
msgId:abcda364-8fc3-4d18-ae5d-1d7a8ffd0929                            
topic:Topic-test                                                      
queueOffset:9
bodySize:16
body:消息内容msgm
transactionId:d42733b5-3911-4f0a-b1db-11eb45a30345                            
commitLogOffset:28

3.根据messageId读取indexFile,然后根据偏移量从CommitLog读取一条完整的消息

package org.apache.rocketmq.test.smoke;

import java.io.IOException;
import java.net.URI;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

public class IndexFileMessageReadTest {

    public static MappedByteBuffer mappedByteBuffer = null;
    public static void main(String[] args) throws IOException {
        String msgId = "8b78474f-b28a-4442-99a0-6f7883f0302b";
        readByMessageId(msgId);

    }

    private static void readByMessageId(String messageId) throws IOException {
        FileChannel indexFileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/index.txt")),
                StandardOpenOption.WRITE, StandardOpenOption.READ);
        MappedByteBuffer indexMappedByteBuffer = indexFileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
        indexFileChannel.close();

        System.out.println("============get indexFile header===============");
        System.out.println("beginTimestampIndex:"+indexMappedByteBuffer.getLong());
        System.out.println("endTimestampIndex:"+indexMappedByteBuffer.getLong());
        System.out.println("beginPhyoffsetIndex:"+indexMappedByteBuffer.getLong());
        System.out.println("endPhyoffsetIndex:"+indexMappedByteBuffer.getLong());
        System.out.println("hashSlotcountIndex:"+indexMappedByteBuffer.getInt());
        System.out.println("indexCountIndex:"+indexMappedByteBuffer.getInt());
        System.out.println("");

        int keyHash = messageId.hashCode();

        //取模,计算第几个槽位
        int slotPos = keyHash % 100 > 0?keyHash % 100:-1*(keyHash % 100);
        System.out.println("messageId:"+messageId+ ",取模为:"+slotPos);

        // slot在文件中的字节数组位置
        int absSlotPos = 40 + slotPos * 4;
        System.out.println("哈希槽的字节数组位置:(40+"+slotPos+"*4)="+absSlotPos);


        //获取hash槽上存取的件索引,第几个文件
        int index =indexMappedByteBuffer.getInt(absSlotPos);

        //计算数据需要存储的偏移量
        int absIndexPos =
                40 + 100 * 4
                        + index * 20;

        System.out.println("第几个文件index="+index+",实际存储数据的字节数组位置:(40 + 100 * 4+index *20)="+absIndexPos);

        long keyHash1 = indexMappedByteBuffer.getInt(absIndexPos);
        long pyhOffset = indexMappedByteBuffer.getLong(absIndexPos+4);
        int timeDiff = indexMappedByteBuffer.getInt(absIndexPos+4+8);
        int preIndexNo = indexMappedByteBuffer.getInt(absIndexPos+4+8+4);


        System.out.println("从index获取到的commitLog偏移量为:"+pyhOffset);
        System.out.println("");

        readCommitLogByOffset((int)pyhOffset);

    }


    public static MappedByteBuffer initFileChannel() throws IOException {
        if(mappedByteBuffer == null){
          FileChannel  commitLogfileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/commitLog.txt")),
                    StandardOpenOption.WRITE, StandardOpenOption.READ);

             mappedByteBuffer = commitLogfileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
            commitLogfileChannel.close();
        }

        return mappedByteBuffer;

    }


    /*
    *
    * 根据偏移量读取CcommitLog
    * */
    public static void readCommitLogByOffset(int offset) throws IOException {


        /* 存放顺序,读时候保持顺序一致
            b.putLong(totalSize);//totalSize
            b.put(getBytes(msgId, 64));//msgId
            b.put(getBytes(topic, 64));//topic,定长64
            b.putLong(queueOffset);//索引偏移量
            b.putLong(msgBody.getBytes(StandardCharsets.UTF_8).length);//bodySize
            b.put(msgBody.getBytes(StandardCharsets.UTF_8));//body
            b.put(getBytes(transactionId, 64));
            b.putLong(commitLogOffset);//commitLogOffset
        */
        System.out.println("=================commitlog读取偏移量为"+offset+"的消息===================");

        MappedByteBuffer  mappedByteBuffer = initFileChannel();
        mappedByteBuffer.position(offset);


        long totalSize = mappedByteBuffer.getLong();//消息长度

        byte[] msgIdByte = new byte[64];//uuid 固定是64
        mappedByteBuffer.get(msgIdByte);

        byte[] topicByte = new byte[64];// 固定是64
        mappedByteBuffer.get(topicByte);
        long queueOffset = mappedByteBuffer.getLong();
        Long bodySize = mappedByteBuffer.getLong();
        int bSize = Integer.valueOf(bodySize+"");
        byte[] bodyByte = new  byte[bSize];//bodySize 长度不固定
        mappedByteBuffer.get(bodyByte);
        byte[] transactionIdByte = new byte[64];//uuid 固定是64
        mappedByteBuffer.get(transactionIdByte);
        long commitLogOffset = mappedByteBuffer.getLong();//偏移量
        System.out.println("totalSize:"+totalSize);
        System.out.println("msgId:"+new String(msgIdByte));
        System.out.println("topic:"+new String(topicByte));
        System.out.println("queueOffset:"+queueOffset);
        System.out.println("bodySize:"+bodySize);
        System.out.println("body:"+new String(bodyByte));
        System.out.println("transactionId:"+new String(transactionIdByte));
        System.out.println("commitLogOffset:"+commitLogOffset);

    }



    public static byte[] toByteArray(long number) {
        byte length = Long.BYTES;
        byte[] bytes = new byte[length];

        for (byte i = 0; i < length; i++) {
            bytes[length - 1 - i] = (byte) number;
            number >>= 8;
        }

        return bytes;

    }

}

运行结果:

============get indexFile header===============
beginTimestampIndex:1669554286826
endTimestampIndex:1669552196010
beginPhyoffsetIndex:0
endPhyoffsetIndex:31259
hashSlotcountIndex:10
indexCountIndex:10

messageId:8b78474f-b28a-4442-99a0-6f7883f0302b,取模为:65
哈希槽的字节数组位置:(40+65*4)=300
第几个文件index=4,实际存储数据的字节数组位置:(40 + 100 * 4+index *20)=520
从index获取到的commitLog偏移量为:1274

=================commitlog读取偏移量为1274的消息===================
totalSize:320
msgId:8b78474f-b28a-4442-99a0-6f7883f0302b                            
topic:Topic-test                                                      
queueOffset:4
bodySize:40
body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:5c0ff6fe-aea3-40c3-8647-6f4bdd797a78                            
commitLogOffset:1274

本文基于java NIO实现了RocketMQ的文件系统的最精简的实现,希望能帮助相关开发人员了解文件系统底层的实现原理。欢迎一起交流讨论,不足的地方欢迎指正。

页面更新:2024-05-02

标签:文件   数组   开发者   文件系统   字节   源码   原理   位置   消息   数据

1 2 3 4 5

上滑加载更多 ↓
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top