进程间通信之消息队列
发布日期:2021-05-06 19:40:57 浏览次数:20 分类:精选文章

本文共 12065 字,大约阅读时间需要 40 分钟。

消息队列是内核地址空间中的内部链表,通过Linux内核在各个进程之间传递内容,消息顺序地发送到消息队列中,并且以几种不同的方式从队列中获取,每个消息队列可以用IPC标识符唯一的进行标识,内核中的消息队列是通过IPC的标识符来区别的,不同的消息队列之间是相互独立的,每个消息队列中的消息又构成一个独立的链表。

消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型地数据结构。我们可以通过发送消息来避免命名管道地同步和阻塞问题。但是消息队列与命名管道一样,每个数据块都有一个最大长度的限制。
消息队列就是一个消息的链表,可以把消息看作一个记录,具有特定的格式以及特定的优先级。对消息队列有写权限的进程可以向消息队列中按照一定规则添加新消息;对消息队列有读权限的进程可以从消息队列中读走消息。消息队列是随内核持续地。
消息队列与管道不同的是,消息队列是基于消息的,而管道是基于字节流的,且消息队列的读取不一定是先入先出。
消息队列也有类似管道一样的不足之处,就是每个消息的最大长度是有上限的(MSGMAX),通常为8192字节,每个消息队列的总的字节数也是有上限的(MSGMNB),系统上消息队列的总数也有一个上限(MSGMNI)。

消息队列的原理

MQ(Message Queue,消息队列)传递的是消息,消息即是我们需要在进程间传递的数据。MQ采用链表来实现消息队列,该链表是由系统内核进行维护,系统中可能有很多的MQ,每个MQ用消息队列描述符(qid)来区分,qid是唯一的,用来区分不同的MQ。在进行进程间通信时,一个进程将消息加到MQ尾端,另一个进程从消息队列中读取消息(不一定以先进先出的规则来读取消息,可以按照消息类型字段读取消息),这样就实现了进程间的通信。如下模型所示:
在这里插入图片描述
消息队列中的数据结构
1.消息缓冲区结构
向消息队列发送消息时,必须组成合理的数据结构。Linux系统定义了一个模板数据结构msgbuf,程序员可以以这个结构为模板来定义自己的消息结构。在头文件<linux/msg.h>中,定义如下:

#include 
struct msgbuf{ long mtype; //消息类型 char mtext[1]; //消息数据};

在结构体msgbuf中有两个数据成员:

mtype:消息类型,以正数来表示。用户可以给某个消息设定一个类型,可以在消息队列中正确地发送和接收自己的消息。例如,在socket编程过程中,一个服务器可以接受多个客户端的连接,可以为每个客户端设定一个消息类型,服务器和客户端之间的通信可以通过此消息类型来发送和接收消息,并且多个客户端之间通过消息类型来区分。
mtext:消息数据。类型为char,长度为1。在构建自己的消息结构时,这个域能存放任意形式的任意数据,应用程序编程人员可以重新定义msgbuf结构。例如:

#include 
struct msgbuf{ long mtype; //消息类型 char mtext[10]; //消息数据 long length;};

此上这个结构体定义与系统定义不一致,但是mtype是一致的。消息在通过内核在进程之间收发时,内核不对mtext域进行转换,任意的消息都可以发送。具体的转换工作是在应用程序之间进行的。但是,消息的大小,存在一个内部的限制。在Linux中,消息的大小在Linux/msg.h中定义为:

#define MSGMAX 8192
消息的大小不能超过8192个字节,这其中包括mtype成员,它的长度是4个字节(long类型)。
从消息的结构来看,其在两方面受到制约:首先,它必须小于系统规定的上限值;其次,它必须以long int长整数开始,接收者函数将利用这个长整数确定消息的类型。

2.msqid_ds结构

Linux内核中,每个消息队列都维护一个结构体,内核msqid_ds结构——IPC对象分为3类,每一类都有一个内部数据结构,该数据结构是由内核维护的。对于消息队列而言,它的内部数据结构是msqid_ds结构,此结构体保存着消息队列当前状态信息。对于系统上创建的每个消息队列,内核均为其创建、存储和维护该结构的一个实例。该结构在linux/msg.h中定义,如下:

#include 
struct msqid_ds{ struct ipc_perm msg_perm; time_t msg_stime; //发送队列的最后一个时间戳 time_t msg_rtime; //从队列中获取的最后一个消息的时间戳 time_t msg_ctime; //对队列进行最后一次变动的时间戳 unsigned long msg_cbytes; //在队列上所驻留的字节总数 msgqnum_t msg_qnum; //当前处于队列中的消息数目 msglen_t msg_qbytes; //队列中能容纳的字节的最大数目 pid_t msg_lspid; //发送最后一个消息进程的PID pid_t msg_lrpid; //接收最后一个消息进程的PID};

msg_perm:它是ipc_perm结构的一个实例,ipc_perm结构是在linux/ipc.h中定义的,用于存放消息队列的许可权限信息,其中包括访问许可信息,以及队列创建者的有关信息。

3.ipc_perm结构

系统为每一个IPC对象保存一个ipc_perm结构体,内核把IPC对象的许可权限信息存放在ipc_perm类型的结构中。例如在前面描述的某个消息队列的内部结构中,msg_perm成员就是ipc_perm类型的,它的定义是存储在文件linux/ipc.h中,如下:

#include 
struct ipc_perm{ key_t key; //函数msgget()使用的键值 uid_t uid; //用户的UID gid_t gid; //用户的GID uid_t cuid; //建立者的UID gid_t cgid; //建立者的GID unsigned short mode; //权限 unsigned short seq; //序列号};

消息队列在内核中的表示:

内核中的消息队列关系:
作为IPC的消息队列,其消息的传递是通过Linux内核来进行的。如下图所示。在消息发送和接收的时候,内核通过一个比较巧妙的设置来实现消息插入队列的动作和从消息中查找消息的算法。结构list_head形成一个链表,而结构msg_msg中的m_list成员是一个struct list_head类型的变量,通过此变量,消息形成了一个链表,在查找和插入时,对m_list域进行偏移操作就可以找到对应的消息体位置。内核中的代码在头文件<linux/msg.h>和<linux/msg.c>中,主要的实现是插入消息和取出消息的操作。
在这里插入图片描述
消息队列函数及使用:
*1.ftok()函数:*将路径名和项目的表示符转变为一个系统V的IPC键值。
函数原型为:

#include 
#include
key_t ftok(const char* pathname,int project_id);

其中pathname是已经存在的目录,一般情况下取根目录下的tmp文件(“/tmp”),而project_id是一个8位的值,通常用a、b等表示。如果创建失败,返回-1。

#include 
#include
int main(){ key_t key; char* msgpath="/tmp"; key=ftok(msgpath,'a'); if(key != -1) { printf("ox%x\n",key); printf("键值成功建立!\n"); } else { printf("键值建立失败!\n"); return -1; } return 0;}

在这里插入图片描述*2.msgget()函数:*创建一个新的消息队列,或访问一个现有的消息队列。

函数原型为:

#include 
#include
#include
int msgget(key_t key,int msgflg);

msgget()函数的第一个参数是键值,可以用ftok()函数生成,此键值将被拿来与内核中其他消息队列的现有关键字值相比较。比较之后,打开或者访问操作依赖于msgflg参数的内容。

IPC_CREAT:如果在内核中不存在该消息队列,则创建;如果存在,则打开。
IPC_EXCL:通常与IPC_CREAT参数一起使用,如果消息队列不存在则创建,如果存在则出错返回。

示例:创建一个消息队列,名字为1234,接着再打开这个消息队列。

#include 
#include
#include
#include
#include
#include
int main(){ int msqid; msqid=msgget(1234,IPC_CREAT | 0666); //键值可以直接指定也可以通过ftok()函数生成,flag参数也可以结合权限使用 if(msqid == -1) { perror("msgget"); return -1; } else { printf("消息队列创建成功!\n"); } msqid=msgget(1234,0); //flag=0表示以原有权限打开,即666 printf("msqid=%d\n",msqid); return 0;}

在这里插入图片描述ox4d2即十六进制的1234。

*3.msgsnd()函数:*把一条消息添加到消息队列中。

函数原型:

#include 
#include
#include
int msgsend(int msqid,void* msgp,size_t msgsz,int msgflag);

此函数若执行成功,返回0,失败返回-1。此函数的第一个参数为队列标识符,是调用msgget()函数获得的返回值;第二个参数是msgp,是一个void类型的指针,指向一个消息缓冲区,发送的消息必须是一个结构体;第三个参数是msgsz,包含这消息的大小,以字节为单位,不包括消息类型的长度的4个字节;第四个参数是flag标志,控制着当前消息队列满或者达到系统上限时将要发生的事情,一般设置为0,表示忽略,也可以设置为IPC_NOWAIT,表示队列满不等待,返回EAGAIN错误。如果消息队列已满,则消息将不会被写入队列中,如果没有指定IPC_NOWAIT,则调用进程将被阻塞(中断),直到可以写消息为止。

*4.msgrcv()函数:*从一个消息队列接收消息

当获得队列标识符后,用户就可以在该消息队列上执行消息队列的接收操作。msgrcv()函数用于介绍队列标识符中的消息,原型为:

#include 
#include
#include
ssize_t msgrcv(int msqid,void* msgp,size_t msgsz,long msgtype,int msgflag);

函数执行成功,返回实际放到接收缓冲区里去的字符个数,失败返回-1。

msqid:由msgget()函数返回的消息队列标识码
msgp:指针,指向准备接收的消息
msgsz:是msgp之乡的消息缓冲区结构的大小,不包括保存消息类型所谓那个long int长整形的长度
msgtype:指定要从队列中获取的消息类型,当msgtype=0时,返回消息队列第一条消息;当msgtype>0时,返回消息队列第一条消息类型等于msgtype的消息;当msgtype<0时,返回队列第一条消息类型小于等于msgtype绝对值的消息,并且是满足条件的消息类型最小的消息。
msgflag:控制着队列中没有相应类型的消息可供接收时将要发生的事情,当msgflag=IPC_NOWAIT时,表示队列没有可读消息,不等待,返回ENOMSG错误;当msgflag=MSG_NOERROR时,表示消息大小超过msgsz时被截断;当msgtype>0且msgflag-MSG_EXCEPT时,接收类型不等于msgtype的第一条消息。

*5.msgctl()函数:*消息队列的控制函数

#include 
#include
#include
int msgctl(int msgid,int cmd,struct msqid_ds* buf);

函数执行成功,返回0,失败返回-1。

msqid:由msgget()函数返回的消息队列标识码
cmd:将要采取的动作,取值有三个
IPC_STAT 把msqid_ds结构中的数据设置为消息队列的当前关联值
IPC_SET 在进程由足够权限的前提下,把消息队列的当前关联值设置为msqid_ds 数据结构中给出的值
IPC_RMID 删除消息队列
buf:消息队列的结构

示例1:删除消息队列

msgctl.c

#include 
#include
#include
#include
#include
#include
#include
int main(){ int msgid; msgid=msgget(1234,0666 | IPC_CREAT); if(msgid == -1) { printf("创建消息队列失败!\n"); return -1; } else { printf("消息队列创建成功!\n"); msgid=msgget(1234,0); printf("msgid=%d\n",msgid); } //msgctl(msgid,IPC_RMID,NULL); return 0;}

先将控制函数屏蔽掉运行文件,使其成功创建消息队列,成功如下图:

在这里插入图片描述
之后,再运行完整的文件,发现消息队列已经被删除
在这里插入图片描述示例2:更改消息队列的参数,如权限,使用msgctl()函数,cmd取值为IPC_SET

msgctl_set.c

#include 
#include
#include
#include
#include
#include
int main(){ int msgid; msgid=msgget(1234,0666 | IPC_CREAT); if(msgid == -1) { printf("消息队列创建失败!\n"); return -1; } else { printf("消息队列创建成功!\n"); msgid=msgget(1234,0); printf("msgid=%d\n",msgid); } struct msqid_ds buf; msgctl(msgid,IPC_STAT,&buf); printf("权限:%o\n",buf.msg_perm.mode); sscanf("600","%o",(unsigned int*)&buf.msg_perm.mode); msgctl(msgid,IPC_SET,&buf); return 0;}

创建时权限为666,之后修改为600

在这里插入图片描述**应用案例:**使用IPC_PRIVATE创建的IPC对象,key值属性为0,和IPC对象的编号就没有了对应关系。这样毫无关系的进程就不能通过key值来得到IPC对象的编号,这种方式创建的IPC对象的key值都是0。因此,这种方式产生的IPC对象和无名管道类似,不能用于毫无关系的进程间通信,通常使用在有亲缘关系的进程中。

#include "../Head.c"//消息缓冲区结构struct msgbuff{    long mtype;    char mtext[100];};int main(){    //创建私有消息队列    int msgid=msgget(IPC_PRIVATE,IPC_CREAT | 0777);    //如果创建失败    if(msgid == -1)    {        printf("创建消息队列失败!\n");    }    //复制进程    int cid=fork();    //父进程    if(cid>0)    {        msgbuff msg={1,"Hello!"};        //发送消息        msgsnd(msgid,&msg,sizeof(msg),IPC_NOWAIT);        sleep(2);        //接收消息        msgrcv(msgid,&msg,sizeof(msg),1,IPC_NOWAIT);        printf("%s\n",msg.mtext);    }    //子进程    else if(cid == 0)    {        //子进程先接收消息        msgbuff msg;        msgrcv(msgid,&msg,sizeof(msg),1,IPC_NOWAIT);        printf("%s\n",msg.mtext);        strcpy(msg.mtext,"hello,too!");        msgsnd(msgid,&msg,sizeof(msg),IPC_NOWAIT);    }    return 0;}

在这里插入图片描述示例:使用消息队列来进行进程间通信。进程A写入字符串数据到消息队列中,进程B从消息队列中读取数据。

write_msg.c

#include "../Head.c"struct msgbuf{    long mtype;    char mtext[255];};int main(){    //创建消息队列,用key=123来唯一标识此队列    int msgid=msgget(123,IPC_CREAT | 0666);    //如果消息队列创建成功    if(msgid != -1)    {        //初始化要发送的消息        struct msgbuf sendbuf;        sendbuf.mtype=1;        strcpy(sendbuf.mtext,"我是发送进程,用来写入并发送数据!\n");        //开始发送消息        if(msgsnd(msgid,&sendbuf,sizeof(sendbuf.mtext),0))        {            printf("消息发送成功!\n");        }        else        {            perror("msgsnd:");        }    }    else    {        perror("msgget:");    }    return 0;}

read_msg.c

#include "../Head.c"struct msgbuf{    long mtype;    char mtext[255];};int main(){    //获取消息队列    int msgid=msgget(123,IPC_CREAT | 0666);    if(msgid != -1)    {        struct msgbuf recvbuf;        //接收第一条消息,存到recvbuf中        if(msgrcv(msgid,&recvbuf,sizeof(recvbuf.mtext),0,IPC_NOWAIT) != -1)        {            printf("读取到的消息为:%s",recvbuf.mtext);            //接收到消息之后就删除此队列            if(msgctl(msgid,IPC_RMID,0) != -1)            {                printf("删除队列成功!\n");            }            else            {                perror("msgctl:");            }        }        else        {            perror("msgrcv:");        }    }    else    {        perror("msgget:");    }    return 0;}

在这里插入图片描述综合应用

comm.h

#ifndef _COMM_H_#define _COMM_H#define SERVER_TYPE 1#define CLIENT_TYPE 2#include "../Head.c"struct msgbuf{    long mtype;    char mtext[1024];};int CreatMessageQueue();int GetMessageQueue();int DestoryMessageQueue(int msgid);int SengMessageQueue(int msgid,int who,char*msg);int ReceiveMessageQueue(int msgid,int receType,char out[]);#endif

comm.c

#include "comm.h"static int CommandMessageQueue(int flag){    key_t key=ftok("/tmp",0x6666);    if(key<0)    {        perror("ftok:");        return -1;    }    int msgid=msgget(key,flag);    if(msgid<0)    {        perror("msgget:");    }    return msgid;}int CreatMessageQueue(){    return CommandMessageQueue(IPC_CREAT | IPC_EXCL | 0666);}int GetMessageQueue(){    return CommandMessageQueue(IPC_CREAT);}int DestoryMessageQueue(int msgid){    if(msgctl(msgid,IPC_RMID,NULL) < 0)    {        perror("msgctl:");        return -1;    }    return 0;}int SendMessageQueue(int msgid,int who,char* msg){    struct msgbuf sendbuf;    sendbuf.mtype=who;    strcpy(sendbuf.mtext,msg);    if(msgsnd(msgid,(void*)&sendbuf,sizeof(sendbuf.mtext),0) < 0)    {        perror("msgsnd:");        return -1;    }    return 0;}int ReceiveMessageQueue(int msgid,int recvType,char out[]){    struct msgbuf recvbuf;    int size=sizeof(recvbuf.mtext);    if(msgrcv(msgid,(void*)&recvbuf,size,recvType,0) < 0)    {        perror("msgrcv:");        return -1;    }    strncpy(out,recvbuf.mtext,size);    out[size]=0;    return 0;}

server.c

#include "comm.h"int main(){    int msgid=CreatMessageQueue();    char buf[1024]={0};    while(1)    {        ReceiveMessageQueue(msgid,CLIENT_TYPE,buf);        if(strcasecmp("quit",buf) == 0)        {            break;        }        printf("client# %s\n",buf);        printf("请输入:");        fflush(stdout);        ssize_t s=read(0,buf,sizeof(buf));        if(s>0)        {            buf[s-1]=0;            SendMessageQueue(msgid,SERVER_TYPE,buf);            printf("发送完毕,等待接收...\n");        }    }    DestoryMessageQueue(msgid);    return 0;}

client.c

#include "comm.h"int main(){    int msgid =GetMessageQueue();    char buf[1024]={0};    while(1)    {        printf("请输入:");        fflush(stdout);        ssize_t s=read(0,buf,sizeof(buf));        if(s>0)        {            buf[s-1]=0;            SendMessageQueue(msgid,CLIENT_TYPE,buf);            if(strcasecmp("quit",buf) == 0)            {                break;            }            printf("发送完毕,等待接收...\n");        }        ReceiveMessageQueue(msgid,SERVER_TYPE,buf);        printf("server# %s\n",buf);    }    return 0;}

Makefile

.PHONY:allall:client serverclient:client.c comm.c	gcc -o $@ $^server:server.c comm.c	gcc -o $@ $^.PHONY:cleanclean:	rm -f *.o client server

在这里插入图片描述同时开启两个终端,分别为服务器端和客户端。客户端输入“nihao”,服务器端立刻接收到来自客户端的消息,显示为“client# nihao”,然后服务器端又向客户端发送一条消息“nihao”,客户端收到来自服务端的消息“server# nihao”,然后客户端输入“quit”,结束通信。

在这里插入图片描述在这里插入图片描述

上一篇:ipcs命令与ipcrm命令
下一篇:进程间通信

发表评论

最新留言

感谢大佬
[***.8.128.20]2025年04月12日 07时53分19秒