
本文共 12065 字,大约阅读时间需要 40 分钟。
消息队列是内核地址空间中的内部链表,通过Linux内核在各个进程之间传递内容,消息顺序地发送到消息队列中,并且以几种不同的方式从队列中获取,每个消息队列可以用IPC标识符唯一的进行标识,内核中的消息队列是通过IPC的标识符来区别的,不同的消息队列之间是相互独立的,每个消息队列中的消息又构成一个独立的链表。
消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型地数据结构。我们可以通过发送消息来避免命名管道地同步和阻塞问题。但是消息队列与命名管道一样,每个数据块都有一个最大长度的限制。 消息队列就是一个消息的链表,可以把消息看作一个记录,具有特定的格式以及特定的优先级。对消息队列有写权限的进程可以向消息队列中按照一定规则添加新消息;对消息队列有读权限的进程可以从消息队列中读走消息。消息队列是随内核持续地。 消息队列与管道不同的是,消息队列是基于消息的,而管道是基于字节流的,且消息队列的读取不一定是先入先出。 消息队列也有类似管道一样的不足之处,就是每个消息的最大长度是有上限的(MSGMAX),通常为8192字节,每个消息队列的总的字节数也是有上限的(MSGMNB),系统上消息队列的总数也有一个上限(MSGMNI)。消息队列的原理
MQ(Message Queue,消息队列)传递的是消息,消息即是我们需要在进程间传递的数据。MQ采用链表来实现消息队列,该链表是由系统内核进行维护,系统中可能有很多的MQ,每个MQ用消息队列描述符(qid)来区分,qid是唯一的,用来区分不同的MQ。在进行进程间通信时,一个进程将消息加到MQ尾端,另一个进程从消息队列中读取消息(不一定以先进先出的规则来读取消息,可以按照消息类型字段读取消息),这样就实现了进程间的通信。如下模型所示:
#includestruct msgbuf{ long mtype; //消息类型 char mtext[1]; //消息数据};
在结构体msgbuf中有两个数据成员:
mtype:消息类型,以正数来表示。用户可以给某个消息设定一个类型,可以在消息队列中正确地发送和接收自己的消息。例如,在socket编程过程中,一个服务器可以接受多个客户端的连接,可以为每个客户端设定一个消息类型,服务器和客户端之间的通信可以通过此消息类型来发送和接收消息,并且多个客户端之间通过消息类型来区分。 mtext:消息数据。类型为char,长度为1。在构建自己的消息结构时,这个域能存放任意形式的任意数据,应用程序编程人员可以重新定义msgbuf结构。例如:#includestruct msgbuf{ long mtype; //消息类型 char mtext[10]; //消息数据 long length;};
此上这个结构体定义与系统定义不一致,但是mtype是一致的。消息在通过内核在进程之间收发时,内核不对mtext域进行转换,任意的消息都可以发送。具体的转换工作是在应用程序之间进行的。但是,消息的大小,存在一个内部的限制。在Linux中,消息的大小在Linux/msg.h中定义为:
#define MSGMAX 8192 消息的大小不能超过8192个字节,这其中包括mtype成员,它的长度是4个字节(long类型)。 从消息的结构来看,其在两方面受到制约:首先,它必须小于系统规定的上限值;其次,它必须以long int长整数开始,接收者函数将利用这个长整数确定消息的类型。2.msqid_ds结构
Linux内核中,每个消息队列都维护一个结构体,内核msqid_ds结构——IPC对象分为3类,每一类都有一个内部数据结构,该数据结构是由内核维护的。对于消息队列而言,它的内部数据结构是msqid_ds结构,此结构体保存着消息队列当前状态信息。对于系统上创建的每个消息队列,内核均为其创建、存储和维护该结构的一个实例。该结构在linux/msg.h中定义,如下:#includestruct msqid_ds{ struct ipc_perm msg_perm; time_t msg_stime; //发送队列的最后一个时间戳 time_t msg_rtime; //从队列中获取的最后一个消息的时间戳 time_t msg_ctime; //对队列进行最后一次变动的时间戳 unsigned long msg_cbytes; //在队列上所驻留的字节总数 msgqnum_t msg_qnum; //当前处于队列中的消息数目 msglen_t msg_qbytes; //队列中能容纳的字节的最大数目 pid_t msg_lspid; //发送最后一个消息进程的PID pid_t msg_lrpid; //接收最后一个消息进程的PID};
msg_perm:它是ipc_perm结构的一个实例,ipc_perm结构是在linux/ipc.h中定义的,用于存放消息队列的许可权限信息,其中包括访问许可信息,以及队列创建者的有关信息。
3.ipc_perm结构
系统为每一个IPC对象保存一个ipc_perm结构体,内核把IPC对象的许可权限信息存放在ipc_perm类型的结构中。例如在前面描述的某个消息队列的内部结构中,msg_perm成员就是ipc_perm类型的,它的定义是存储在文件linux/ipc.h中,如下:#includestruct ipc_perm{ key_t key; //函数msgget()使用的键值 uid_t uid; //用户的UID gid_t gid; //用户的GID uid_t cuid; //建立者的UID gid_t cgid; //建立者的GID unsigned short mode; //权限 unsigned short seq; //序列号};
消息队列在内核中的表示:


#include#include key_t ftok(const char* pathname,int project_id);
其中pathname是已经存在的目录,一般情况下取根目录下的tmp文件(“/tmp”),而project_id是一个8位的值,通常用a、b等表示。如果创建失败,返回-1。
#include#include int main(){ key_t key; char* msgpath="/tmp"; key=ftok(msgpath,'a'); if(key != -1) { printf("ox%x\n",key); printf("键值成功建立!\n"); } else { printf("键值建立失败!\n"); return -1; } return 0;}
*2.msgget()函数:*创建一个新的消息队列,或访问一个现有的消息队列。
#include#include #include int msgget(key_t key,int msgflg);
msgget()函数的第一个参数是键值,可以用ftok()函数生成,此键值将被拿来与内核中其他消息队列的现有关键字值相比较。比较之后,打开或者访问操作依赖于msgflg参数的内容。
IPC_CREAT:如果在内核中不存在该消息队列,则创建;如果存在,则打开。 IPC_EXCL:通常与IPC_CREAT参数一起使用,如果消息队列不存在则创建,如果存在则出错返回。示例:创建一个消息队列,名字为1234,接着再打开这个消息队列。
#include#include #include #include #include #include int main(){ int msqid; msqid=msgget(1234,IPC_CREAT | 0666); //键值可以直接指定也可以通过ftok()函数生成,flag参数也可以结合权限使用 if(msqid == -1) { perror("msgget"); return -1; } else { printf("消息队列创建成功!\n"); } msqid=msgget(1234,0); //flag=0表示以原有权限打开,即666 printf("msqid=%d\n",msqid); return 0;}
ox4d2即十六进制的1234。
*3.msgsnd()函数:*把一条消息添加到消息队列中。
函数原型:#include#include #include int msgsend(int msqid,void* msgp,size_t msgsz,int msgflag);
此函数若执行成功,返回0,失败返回-1。此函数的第一个参数为队列标识符,是调用msgget()函数获得的返回值;第二个参数是msgp,是一个void类型的指针,指向一个消息缓冲区,发送的消息必须是一个结构体;第三个参数是msgsz,包含这消息的大小,以字节为单位,不包括消息类型的长度的4个字节;第四个参数是flag标志,控制着当前消息队列满或者达到系统上限时将要发生的事情,一般设置为0,表示忽略,也可以设置为IPC_NOWAIT,表示队列满不等待,返回EAGAIN错误。如果消息队列已满,则消息将不会被写入队列中,如果没有指定IPC_NOWAIT,则调用进程将被阻塞(中断),直到可以写消息为止。
*4.msgrcv()函数:*从一个消息队列接收消息
当获得队列标识符后,用户就可以在该消息队列上执行消息队列的接收操作。msgrcv()函数用于介绍队列标识符中的消息,原型为:#include#include #include ssize_t msgrcv(int msqid,void* msgp,size_t msgsz,long msgtype,int msgflag);
函数执行成功,返回实际放到接收缓冲区里去的字符个数,失败返回-1。
msqid:由msgget()函数返回的消息队列标识码 msgp:指针,指向准备接收的消息 msgsz:是msgp之乡的消息缓冲区结构的大小,不包括保存消息类型所谓那个long int长整形的长度 msgtype:指定要从队列中获取的消息类型,当msgtype=0时,返回消息队列第一条消息;当msgtype>0时,返回消息队列第一条消息类型等于msgtype的消息;当msgtype<0时,返回队列第一条消息类型小于等于msgtype绝对值的消息,并且是满足条件的消息类型最小的消息。 msgflag:控制着队列中没有相应类型的消息可供接收时将要发生的事情,当msgflag=IPC_NOWAIT时,表示队列没有可读消息,不等待,返回ENOMSG错误;当msgflag=MSG_NOERROR时,表示消息大小超过msgsz时被截断;当msgtype>0且msgflag-MSG_EXCEPT时,接收类型不等于msgtype的第一条消息。*5.msgctl()函数:*消息队列的控制函数
#include#include #include int msgctl(int msgid,int cmd,struct msqid_ds* buf);
函数执行成功,返回0,失败返回-1。
msqid:由msgget()函数返回的消息队列标识码 cmd:将要采取的动作,取值有三个 IPC_STAT 把msqid_ds结构中的数据设置为消息队列的当前关联值 IPC_SET 在进程由足够权限的前提下,把消息队列的当前关联值设置为msqid_ds 数据结构中给出的值 IPC_RMID 删除消息队列 buf:消息队列的结构示例1:删除消息队列
msgctl.c#include#include #include #include #include #include #include int main(){ int msgid; msgid=msgget(1234,0666 | IPC_CREAT); if(msgid == -1) { printf("创建消息队列失败!\n"); return -1; } else { printf("消息队列创建成功!\n"); msgid=msgget(1234,0); printf("msgid=%d\n",msgid); } //msgctl(msgid,IPC_RMID,NULL); return 0;}
先将控制函数屏蔽掉运行文件,使其成功创建消息队列,成功如下图:


msgctl_set.c
#include#include #include #include #include #include int main(){ int msgid; msgid=msgget(1234,0666 | IPC_CREAT); if(msgid == -1) { printf("消息队列创建失败!\n"); return -1; } else { printf("消息队列创建成功!\n"); msgid=msgget(1234,0); printf("msgid=%d\n",msgid); } struct msqid_ds buf; msgctl(msgid,IPC_STAT,&buf); printf("权限:%o\n",buf.msg_perm.mode); sscanf("600","%o",(unsigned int*)&buf.msg_perm.mode); msgctl(msgid,IPC_SET,&buf); return 0;}
创建时权限为666,之后修改为600

#include "../Head.c"//消息缓冲区结构struct msgbuff{ long mtype; char mtext[100];};int main(){ //创建私有消息队列 int msgid=msgget(IPC_PRIVATE,IPC_CREAT | 0777); //如果创建失败 if(msgid == -1) { printf("创建消息队列失败!\n"); } //复制进程 int cid=fork(); //父进程 if(cid>0) { msgbuff msg={1,"Hello!"}; //发送消息 msgsnd(msgid,&msg,sizeof(msg),IPC_NOWAIT); sleep(2); //接收消息 msgrcv(msgid,&msg,sizeof(msg),1,IPC_NOWAIT); printf("%s\n",msg.mtext); } //子进程 else if(cid == 0) { //子进程先接收消息 msgbuff msg; msgrcv(msgid,&msg,sizeof(msg),1,IPC_NOWAIT); printf("%s\n",msg.mtext); strcpy(msg.mtext,"hello,too!"); msgsnd(msgid,&msg,sizeof(msg),IPC_NOWAIT); } return 0;}
示例:使用消息队列来进行进程间通信。进程A写入字符串数据到消息队列中,进程B从消息队列中读取数据。
#include "../Head.c"struct msgbuf{ long mtype; char mtext[255];};int main(){ //创建消息队列,用key=123来唯一标识此队列 int msgid=msgget(123,IPC_CREAT | 0666); //如果消息队列创建成功 if(msgid != -1) { //初始化要发送的消息 struct msgbuf sendbuf; sendbuf.mtype=1; strcpy(sendbuf.mtext,"我是发送进程,用来写入并发送数据!\n"); //开始发送消息 if(msgsnd(msgid,&sendbuf,sizeof(sendbuf.mtext),0)) { printf("消息发送成功!\n"); } else { perror("msgsnd:"); } } else { perror("msgget:"); } return 0;}
read_msg.c
#include "../Head.c"struct msgbuf{ long mtype; char mtext[255];};int main(){ //获取消息队列 int msgid=msgget(123,IPC_CREAT | 0666); if(msgid != -1) { struct msgbuf recvbuf; //接收第一条消息,存到recvbuf中 if(msgrcv(msgid,&recvbuf,sizeof(recvbuf.mtext),0,IPC_NOWAIT) != -1) { printf("读取到的消息为:%s",recvbuf.mtext); //接收到消息之后就删除此队列 if(msgctl(msgid,IPC_RMID,0) != -1) { printf("删除队列成功!\n"); } else { perror("msgctl:"); } } else { perror("msgrcv:"); } } else { perror("msgget:"); } return 0;}
综合应用
comm.h
#ifndef _COMM_H_#define _COMM_H#define SERVER_TYPE 1#define CLIENT_TYPE 2#include "../Head.c"struct msgbuf{ long mtype; char mtext[1024];};int CreatMessageQueue();int GetMessageQueue();int DestoryMessageQueue(int msgid);int SengMessageQueue(int msgid,int who,char*msg);int ReceiveMessageQueue(int msgid,int receType,char out[]);#endif
comm.c
#include "comm.h"static int CommandMessageQueue(int flag){ key_t key=ftok("/tmp",0x6666); if(key<0) { perror("ftok:"); return -1; } int msgid=msgget(key,flag); if(msgid<0) { perror("msgget:"); } return msgid;}int CreatMessageQueue(){ return CommandMessageQueue(IPC_CREAT | IPC_EXCL | 0666);}int GetMessageQueue(){ return CommandMessageQueue(IPC_CREAT);}int DestoryMessageQueue(int msgid){ if(msgctl(msgid,IPC_RMID,NULL) < 0) { perror("msgctl:"); return -1; } return 0;}int SendMessageQueue(int msgid,int who,char* msg){ struct msgbuf sendbuf; sendbuf.mtype=who; strcpy(sendbuf.mtext,msg); if(msgsnd(msgid,(void*)&sendbuf,sizeof(sendbuf.mtext),0) < 0) { perror("msgsnd:"); return -1; } return 0;}int ReceiveMessageQueue(int msgid,int recvType,char out[]){ struct msgbuf recvbuf; int size=sizeof(recvbuf.mtext); if(msgrcv(msgid,(void*)&recvbuf,size,recvType,0) < 0) { perror("msgrcv:"); return -1; } strncpy(out,recvbuf.mtext,size); out[size]=0; return 0;}
server.c
#include "comm.h"int main(){ int msgid=CreatMessageQueue(); char buf[1024]={0}; while(1) { ReceiveMessageQueue(msgid,CLIENT_TYPE,buf); if(strcasecmp("quit",buf) == 0) { break; } printf("client# %s\n",buf); printf("请输入:"); fflush(stdout); ssize_t s=read(0,buf,sizeof(buf)); if(s>0) { buf[s-1]=0; SendMessageQueue(msgid,SERVER_TYPE,buf); printf("发送完毕,等待接收...\n"); } } DestoryMessageQueue(msgid); return 0;}
client.c
#include "comm.h"int main(){ int msgid =GetMessageQueue(); char buf[1024]={0}; while(1) { printf("请输入:"); fflush(stdout); ssize_t s=read(0,buf,sizeof(buf)); if(s>0) { buf[s-1]=0; SendMessageQueue(msgid,CLIENT_TYPE,buf); if(strcasecmp("quit",buf) == 0) { break; } printf("发送完毕,等待接收...\n"); } ReceiveMessageQueue(msgid,SERVER_TYPE,buf); printf("server# %s\n",buf); } return 0;}
Makefile
.PHONY:allall:client serverclient:client.c comm.c gcc -o $@ $^server:server.c comm.c gcc -o $@ $^.PHONY:cleanclean: rm -f *.o client server
同时开启两个终端,分别为服务器端和客户端。客户端输入“nihao”,服务器端立刻接收到来自客户端的消息,显示为“client# nihao”,然后服务器端又向客户端发送一条消息“nihao”,客户端收到来自服务端的消息“server# nihao”,然后客户端输入“quit”,结束通信。


发表评论
最新留言
关于作者
