进程间通信(IPC) 系列 - Posix 消息队列

消息队列是 Linux IPC 中很常用的一种通信方式,今天分析一下Posix消息队列,本文中所讲的消息队列均为Posix消息队列。

什么是 Posix 消息队

消息队列可以认为它是一个消息链表,有足够写权限的进程可以往队列中发送消息,有足够读权限的进程可以往队列中接收消息。

每个消息都是一个记录,它由发送者赋予一个优先级。在某个进程往一个队列写入消息之前,并不需要另外某个进程在该队列上等待消息的到达。这也就说明消息队列具有随内核的持续性,也就是说进程关闭后,消息队列依然存在,除非内核重新自举。


Posix 消息队列有如下特点


Posix 消息队列中的每条消息通常具有以下属性


消息队列的基本操作

打开或创建一个 posix 消息队列操作接口


mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
Link with -lrt.

参数 name 为 posix IPC 名字, 即将要被打开或创建的消息队列对象,为了便于移植,需要指定为“/name”的格式。

参数oflag必须要有O_RDONLY(只读)、标志O_RDWR (读写), O_WRONLY(只写)之一,除此之外还可以指定 O_CREAT(没有该对象则创建)、O_EXCL(如果 O_CREAT 指定,但 name 不存在,就返回错误),O_NONBLOCK(以非阻塞方式打开消息队列,在正常情况下mq_receive和mq_send 函数会阻塞的地方,使用该标志打开的消息队列会返回 EAGAIN 错误)。

当操作一个新队列时,使用 O_CREAT 标识,此时后面两个参数需要被指定,参数 mode 为指定权限位,attr 指定新创建队列的属性。


关闭进程描述符操作接口

int mq_close(mqd_t mqdes);  

关闭之后告诉进程不在使用该描述符,但消息队列不会从系统中删除。


系统中删除某个消息队列操作接口

int mq_unlink(const char *name);  

参数为mq_open()函数第一个参数,调用该接口后删除会马上发生,即使该队列的描述符引用计数仍然大于0。


关于消息队列中设置和和获取消息队列属性接口

mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);
mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr); 


每个消息队列有四个属性, mq_getattr返回所有的这些属性, mq_setattr设置其中的某个属性

消息队列的消息具体属性如下

struct mq_attr {
long mq_flags; /* Flags: 0 or O_NONBLOCK */
long mq_maxmsg; /* Max. # of messages on queue */
long mq_msgsize; /* Max. message size (bytes) */
long mq_curmsgs; /* # of messages currently in queue */
}


指向mq_attr的指针可以作为mq_open函数的第四个参数传递, 从而在创建队列初就设置好每个消息的最大长度和允许存在的最大消息数量, 另外两个成员被忽略。

mq_setattr给所指定队列设置属性, 但是只使用由attr指向的mq_attr结构的mq_flags成员, 以设置或清除非阻塞标志, 其他三个成员则被忽略(其中两个只能在创建队列时指定, 还有一个及时获取)。当然, mq_setattr的最后一个参数用于接收之前的属性和当前状态


向消息队列放置和取走消息的操作接口

int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio); 
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio); 


参数msg_ptr为指向消息的指针。

msg_len为消息长度,该值不能大于属性值中mq_msgsize的值。

msg_prio为优先级,消息在队列中将按照优先级大小顺序来排列消息。

如果消息队列已满,mq_send()函数将阻塞,直到队列有可用空间再次允许放置消息或该调用被信号打断;如果O_NONBLOCK被指定,mq_send()那么将不会阻塞,而是返回EAGAIN错误。

如果队列空,mq_receive()函数将阻塞,直到消息队列中有新的消息;如果O_NONBLOCK被指定,mq_receive()那么将不会阻塞,而是返回EAGAIN错误。

消息队列的原理分析

消息队列的初始化

static int __init init_mqueue_fs(void)
{
        ...

        //注册消息队列文件系统
        error = register_filesystem(&mqueue_fs_type);

        //构建struct vfsmount结构主要是获取文件系统的super_block对象与根目录的inode与dentry对象,并将这些对象加入到系统链表
        if (IS_ERR(mqueue_mnt = kern_mount(&mqueue_fs_type))) {
            ...
        }

        queues_count = 0;
        spin_lock_init(&mq_lock);

        return 0;

out_filesystem:

out_sysctl:

        return error;
}

__initcall(init_mqueue_fs);


消息队列文件系统初始化很简单,主要工作如下:


mq_open 接口分析

asmlinkage long sys_mq_open(const char __user *u_name, int oflag, mode_t mode, struct mq_attr __user *u_attr)
{

        ...


        //获取一个未使用的文件描述符
        fd = get_unused_fd();

        mutex_lock(&mqueue_mnt->mnt_root->d_inode->i_mutex);
        //获取一个名字为name的dentry结构  
        dentry = lookup_one_len(name, mqueue_mnt->mnt_root, strlen(name));

        mntget(mqueue_mnt);
        //若是新创建
        if (oflag & O_CREAT) {
                if (dentry->d_inode) {  /* entry already exists */
                        audit_inode(name, dentry);
                        error = -EEXIST;
                        if (oflag & O_EXCL)
                                goto out;
                        //若已经存在,则直接打开file
                        filp = do_open(dentry, oflag);
                } else {
                        //创建一个file结构,把inode和dentry与之关联
                        filp = do_create(mqueue_mnt->mnt_root, dentry,
                                                oflag, mode, u_attr);
                }
        } else { //否则,直接获取
                error = -ENOENT;
                if (!dentry->d_inode)
                        goto out;
                audit_inode(name, dentry);
                filp = do_open(dentry, oflag);
        }

        if (IS_ERR(filp)) {
                error = PTR_ERR(filp);
                goto out_putfd;
        }

        //给描述符设置close_on_exec标志
        set_close_on_exec(fd, 1);

        //文件描述符与file进行关联
        fd_install(fd, filp);

        goto out_upsem;

        ...

        return fd;
}


mq_open 的操作很简单,操作如下:


mq_send 接口分析

当使用 mq_send 发送消息时,比如如下调用,

mq_send(mqd, msg, msg_len, msg_prio) 

发送消息时,最终会调用如下函数

mq_timedsend(mqd, msg, msg_len, msg_prio, NULL)


asmlinkage long sys_mq_timedsend(mqd_t mqdes, const char __user *u_msg_ptr, size_t msg_len, unsigned int msg_prio, const struct timespec __user *u_abs_timeout)
{ 
...

//获取file结构
filp = fget(mqdes);
if (unlikely(!filp))
goto out;

inode = filp->f_path.dentry->d_inode;
//获取inode 下的 mqueue_inode_info
info = MQUEUE_I(inode);

//把用户传来的消息转换成内核消息链表,若用户消息长度大于PAGE_SIZE,则链表结构为 msg_msg-> msg_msgseg -> msg_msgseg 每个节点都已一个页大小,除了最后一个节点
//若用户消息大小小于PAGE_SIZE,链表只有一个节点 msg_msg 
// msg_ptr 指向链表头节点 msg_msg
msg_ptr = load_msg(u_msg_ptr, msg_len);
//msg_msg 中记录消息的总长度,和 消息优先级
msg_ptr->m_ts = msg_len; 
msg_ptr->m_type = msg_prio;

spin_lock(&info->lock);
//若消息数量达到最大值
if (info->attr.mq_curmsgs == info->attr.mq_maxmsg) {
//若非阻塞,则返回
if (filp->f_flags & O_NONBLOCK) {
spin_unlock(&info->lock);
ret = -EAGAIN;
//若超时时间小于0,则返回超时时间
} else if (unlikely(timeout < 0)) {
spin_unlock(&info->lock);
ret = timeout;
} else {
//阻塞调用,则进程休眠,把wait加入到info中的SEND等待队列中
wait.task = current;
wait.msg = (void *) msg_ptr;
wait.state = STATE_NONE;
ret = wq_sleep(info, SEND, timeout, &wait);
}

} else {
//若info 接收队列中有阻塞的进程,则把要发送的数据挂到阻塞的进程的消息节点上,然后唤醒阻塞的接收进程
receiver = wq_get_first_waiter(info, RECV);
if (receiver) {
pipelined_send(info, msg_ptr, receiver);
} else {
//把消息挂到消息队列上,并进行通知
msg_insert(msg_ptr, info);
__do_notify(info);
}
inode->i_atime = inode->i_mtime = inode->i_ctime =
CURRENT_TIME;
spin_unlock(&info->lock);
ret = 0;
}

return ret;
}

sys_mq_timedsend 功能如下:


mq_receive 接口分析

当使用 mq_receive 接收消息时,比如如下调用,

接收消息时,最终会调用如下函数

mq_timedreceive(mqd, msg, msg_len, &msg_prio, NULL)具体实现如下
asmlinkage ssize_t sys_mq_timedreceive(mqd_t mqdes, char __user *u_msg_ptr, size_t msg_len, unsigned int __user *u_msg_prio, const struct timespec __user *u_abs_timeout)
{

        ...

        //获取file结构
        filp = fget(mqdes);

        inode = filp->f_path.dentry->d_inode;
        //获取inode 下的 mqueue_inode_info
        info = MQUEUE_I(inode);
        audit_inode(NULL, filp->f_path.dentry);

        spin_lock(&info->lock);
        //消息队列当前没有消息
        if (info->attr.mq_curmsgs == 0) {
                //若非阻塞,则返回
                if (filp->f_flags & O_NONBLOCK) {
                        spin_unlock(&info->lock);
                        ret = -EAGAIN;
                        msg_ptr = NULL;
                //若超时时间小于0,则返回超时时间
                } else if (unlikely(timeout < 0)) {

                } else {
                        //阻塞调用,则进程休眠,把wait加入到info中的RECV等待队列中
                        wait.task = current;
                        wait.state = STATE_NONE;
                        ret = wq_sleep(info, RECV, timeout, &wait);
                        msg_ptr = wait.msg;
                }
        } else {
                //从消息队列的最高优先级中获取一个消息
                msg_ptr = msg_get(info);

                inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;

                //由于从消息队列中已经取出一个消息了,有剩余的空间,因此把等待SEND队列上的进程的消息挂到消息队里中,然后唤醒发送消息的进程
                pipelined_receive(info);
                spin_unlock(&info->lock);
                ret = 0;
        }
        if (ret == 0) {
                ret = msg_ptr->m_ts;
                //把消息拷贝到用户态
                if ((u_msg_prio && put_user(msg_ptr->m_type, u_msg_prio)) ||
                        store_msg(u_msg_ptr, msg_ptr, msg_ptr->m_ts)) {
                        ret = -EFAULT;
                }
                //释放消息空间
                free_msg(msg_ptr);
        }
out_fput:
        fput(filp);
out:
        return ret;
}

sys_mq_timedreceive 功能如下:


mq_unlink 接口分析

asmlinkage long sys_mq_unlink(const char __user *u_name)
{
        ...
        //引用计数减1,删除目录项
        err = vfs_unlink(dentry->d_parent->d_inode, dentry);

        return err;
}


该函数作用很简单,就是减少引用计数,删除目录项。


经过上述的分析,有关消息队列的内存结构可以总结如下:

展开阅读全文

页面更新:2024-05-23

标签:队列   进程   消息   优先级   节点   函数   属性   接口   参数   结构   通信   系列

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top