POSIX和SYSTEM的消息队列应该注意的问题
首先看看POSIX的代碼:
1.posix_mq_server.c
#include <mqueue.h>
#include <sys/stat.h>
#include <string.h>
#include <stdio.h>
#define MQ_FILE "/mq_test"
#define BUF_LEN 128
int main()
{
???? mqd_t mqd;
??? char buf[BUF_LEN];
??? int? por = 0;
??? int ret = 0;
??? struct mq_attr attr;
??? attr.mq_flags = 0;
??? attr.mq_maxmsg = 3;
??? attr.mq_msgsize = 50;
??? attr.mq_curmsgs= 0;
??? mqd = mq_open(MQ_FILE, O_WRONLY,0666,&attr);
??? if (-1 == mqd)
??? {
??????? printf("mq_open error.\n");
??????? return -1;
??? }
??? do{
??????? buf[BUF_LEN-1]='\0';
??????? printf("MQ_MSG : ");
??????? scanf("%s", buf);
??????? if(buf[BUF_LEN-1]!= '\0')
??????? {
??????????? continue;
??????? }
??????? printf("strlen:%d\nMQ_POR : ",strlen(buf));
??????? scanf("%d", &por);
??????? ret== mq_send(mqd, buf, strlen(buf)+1, por);
??????? if (ret != 0)
??????? {
??????????? perror("mq_send error.\n");
??????? }
??????? memset(buf,'\0',BUF_LEN);
??? }while(strcmp(buf, "quit"));
??? mq_close(mqd);
??? mq_unlink(MQ_FILE);
??? return 0;
}
2.posix_mq_client.c
#include <mqueue.h>
#include <sys/stat.h>
#include <string.h>
#include <stdio.h>
#define MQ_FILE "/mq_test"
#define BUF_LEN 128
int main()
{
??? mqd_t mqd;
??? struct mq_attr attr;
??? char buf[BUF_LEN + 1] = "quit";
??? int cnt;
??? int? por = 0;
??? attr.mq_flags = 0;
??? attr.mq_maxmsg = 128;
??? attr.mq_msgsize = 128;
??? attr.mq_curmsgs = 0;
??? //mqd = mq_open(MQ_FILE, O_RDONLY | O_CREAT, S_IRUSR | S_IWUSR, NULL);
??? mqd = mq_open(MQ_FILE, O_RDONLY | O_CREAT, 0644, &attr);
??? if (-1 == mqd)
??? {
??????? printf("mq_open error.\n");
??????? return -1;
??? }
??? do{
??????? cnt = mq_receive(mqd, buf, BUF_LEN, &por);
??????? if (0 < cnt)
??????? {
??????????? printf("mq receive : ");
??????????? fflush(stdout);
??????????? buf[cnt] = '\0';
??????????? printf("%s? por:%d\n", buf,por);
??????? }
??? }while(strcmp(buf, "quit")==0);
??? printf("\n");
??? mq_close(mqd);
??? mq_unlink(MQ_FILE);
??? return 0;
}
3.makefile
target:client? server
client: posix_mq_client.c
??? gcc posix_mq_client.c -o client -lrt
server:posix_mq_server.c
??? gcc posix_mq_server.c -o server -lrt
clean:
??? rm -f client server
??? rm -f *.o
運行make:
==[]==root@gaoke:~/code$./server
MQ_MSG : fgsdfgsdfgsdfg
MQ_POR : 9
MQ_MSG : dfgsdfgsdfg
MQ_POR : 3
MQ_MSG : dfghsdfhgjghdj
MQ_POR : 6
MQ_MSG : sdfgdgfhgjh
MQ_POR : 2
MQ_MSG : dsfghgjghjkh
MQ_POR : 8
MQ_MSG : sdfgsdfgsdfgsd
MQ_POR : 5
MQ_MSG :
==[]==root@gaoke:~/code$./client
mq receive : fgsdfgsdfgsdfg? por:9
mq receive : dsfghgjghjkh? por:8
mq receive : dfghsdfhgjghdj? por:6
mq receive : sdfgsdfgsdfgsd? por:5
mq receive : dfgsdfgsdfg? por:3
mq receive : sdfgdgfhgjh? por:2
我們發(fā)現(xiàn)POSIX是嚴格按照優(yōu)先級排序讀出的,而且先讀出作業(yè)優(yōu)先級最高的作業(yè)。
好了再看看我們的SYSTEM是如何進行的,先寫個簡單的代碼調(diào)試運行看看結(jié)果如何:
首先我先說明一下System V系統(tǒng)的消息對列對象結(jié)構(gòu):
| 01 02 03 04 05 06 07 08 09 10 11 12 13 | struct msqid_ds { ????struct ipc_perm???? msg_perm;?// 權(quán)限,跟共享內(nèi)存一樣 ????struct msg????? *msg_first;// 指向隊列的第一條消息 ????struct msg????? *msg_last;?// 指向隊列的最后一條消息 ????msglen_t??????? msg_cbytes;// 當前隊列所占字節(jié)數(shù) ????msgnum_t??????? msg_qnum;??// 當前隊列的消息數(shù) ????msglen_t??????? msg_qbytes;// 隊列允許的最大字節(jié)數(shù) ????pid_t?????????? msg_lspid;?// 最后調(diào)用msgsnd的PID ????pid_t?????????? msg_lrpid;?// 最后調(diào)用msgrcv的PID ????time_t????????? msg_stime;?// 最后調(diào)用msgsnd的時間 ????time_t????????? msg_rtime;?// 最后調(diào)用msgrcv的時間 ????time_t????????? msg_ctime;?// 最后調(diào)用msgctl的時間 } |
使用其中一個IPC機制時,系統(tǒng)內(nèi)核會維護一個ipc權(quán)限對象,用于設(shè)置讀寫權(quán)限
| 1 2 3 4 5 6 7 8 9 | struct ipc_perm { ????uid_t?? uid;???// owner’s user id ????gid_t?? gid;???????// owner’s group id ????uid_t?? cuid;??????// creator’s user id ????gid_t?? cgid;??????// creator’s group id ????mode_t? mode;??// 讀寫權(quán)限 ????ulong_t seq;???????// 序列號 ????key_t?? key;???????// IPC key } |
?
1.sys_msq_server.c
#include <stdio.h>
#include <string.h>
#include <limits.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define MQ_FILE "./mq_test"
#define BUF_LEN 128
struct msgbuf {
?? long mtype;???? /*? message type, must be > 0 */
?? char mtext[256];? /*? message data */
};
int? main()
{
??? struct msqid_ds info={0};
??? struct msgbuf MSG={0};
??? key_t key = ftok(MQ_FILE,10);
??? int cnt =? msgget(key,IPC_CREAT|0666);//0:取消息隊列標識符,若不存在則函數(shù)會報錯IPC_CREAT:當msgflg&IPC_CREAT為真時,如果內(nèi)核中不存在鍵值與key相等的消息隊列,則新建一個消息隊列;如果存在這樣的消息隊列,返回此消息隊列的標識符IPC_CREAT|IPC_EXCL:如果內(nèi)核中不存在鍵值與key相等的消息隊列,則新建一個消息隊列;如果存在這樣的消息隊列則報錯
??? if(cnt == -1)
??? {
??????? perror("error!");
??? }
??? while(1){
??????? printf("enter the MSG:\n");
??????? scanf("%s",MSG.mtext);
??????? MSG.mtype = 1;
??????? //
??????? msgsnd(cnt,&MSG,strlen(MSG.mtext)+1,IPC_NOWAIT);// 最后一個參數(shù):0:當消息隊列滿時,msgsnd將會阻塞,直到消息能寫進消息隊列IPC_NOWAIT:當消息隊列已滿的時候,msgsnd函數(shù)不等待立即返回IPC_NOERROR:若發(fā)送的消息大于size字節(jié),則把該消息截斷,截斷部分將被丟棄,且不通知發(fā)送進程
??????? msgctl(cnt,IPC_STAT,&info);//IPC_STAT:獲得msgid的消息隊列頭數(shù)據(jù)到buf中IPC_SET:設(shè)置消息隊列的屬性,要設(shè)置的屬性需先存儲在buf中,可設(shè)置的屬性包括:msg_perm.uid、msg_perm.gid、msg_perm.mode以及msg_qbytes
??????? printf("uid:%d, gid = %d, cuid = %d, cgid= %d\n" , info.msg_perm.uid,? info.msg_perm.gid,? info.msg_perm.cuid,? info.msg_perm.cgid? ) ;
??????? printf("read-write:%03o, cbytes = %lu, qnum = %lu, qbytes= %lu\n" , info.msg_perm.mode&0777, info.msg_cbytes, info.msg_qnum, info.msg_qbytes ) ;
??????? system("ipcs -q");
??? }
}
?
2.sys_msq_client.c
#include <stdio.h>
#include <string.h>
#include <limits.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define MQ_FILE "./mq_test"
#define BUF_LEN 128
struct msgbuf {
?? long mtype;???? /*? message type, must be > 0 */
?? char mtext[256];? /*? message data */
};
int? main()
{
??? struct msqid_ds info={0};
??? struct msgbuf MSG={0};
??? key_t key = ftok(MQ_FILE,10);
??? int cnt =? msgget(key,IPC_CREAT|0666);
??? int size =0;
??? if(cnt == -1)
??? {
??????? perror("error!");
??? }
??? while(1){
??????? size = msgrcv(cnt,&MSG,256,1,IPC_NOWAIT);
??????? if(size > 0)
??????? {
?????????????? puts(MSG.mtext);
??????? msgctl(cnt,IPC_STAT,&info);
??????? printf("uid:%d, gid = %d, cuid = %d, cgid= %d\n" ,info.msg_perm.uid,? info.msg_perm.gid,? info.msg_perm.cuid,? info.msg_perm.cgid? ) ;
??????? printf("read-write:%03o, cbytes = %lu, qnum = %lu, qbytes= %lu\n" , info.msg_perm.mode&0777, info.msg_cbytes, info.msg_qnum, info.msg_qbytes ) ;
??????? }
??? }
}
3.makefile
target:client? server
client: sys_msq_client.c
??? gcc sys_msq_client.c -o client -lrt
server:sys_msq_server.c
??? gcc sys_msq_server.c -o server -lrt
clean:
??? rm -f client server
??? rm -f *.o
然后make運行
==[]==root@gaoke:~/code$./server
enter the MSG:
aaaa
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 5, qnum = 1, qbytes= 65536
------ Message Queues --------
key??????? msqid????? owner????? perms????? used-bytes?? messages???
0xffffffff 0????????? root?????? 666??????? 5??????????? 1??????????
enter the MSG:
dddd
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 10, qnum = 2, qbytes= 65536
------ Message Queues --------
key??????? msqid????? owner????? perms????? used-bytes?? messages???
0xffffffff 0????????? root?????? 666??????? 10?????????? 2??????????
enter the MSG:
fffff
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 16, qnum = 3, qbytes= 65536
------ Message Queues --------
key??????? msqid????? owner????? perms????? used-bytes?? messages???
0xffffffff 0????????? root?????? 666??????? 16?????????? 3??????????
enter the MSG:
ggggg
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 22, qnum = 4, qbytes= 65536
------ Message Queues --------
key??????? msqid????? owner????? perms????? used-bytes?? messages???
0xffffffff 0????????? root?????? 666??????? 22?????????? 4??????????
?==[]==root@gaoke:~/code$./client
asdfasdfas
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 0, qnum = 0, qbytes= 65536
asdfasdfasdf
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 0, qnum = 0, qbytes= 65536
ssss
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 0, qnum = 0, qbytes= 65536
==[]==root@gaoke:~/code$./client
aaaa
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 17, qnum = 3, qbytes= 65536
dddd
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 12, qnum = 2, qbytes= 65536
fffff
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 6, qnum = 1, qbytes= 65536
ggggg
uid:0, gid = 0, cuid = 0, cgid= 0
read-write:666, cbytes = 0, qnum = 0, qbytes= 65536
由此我們還發(fā)現(xiàn)了什么呢?有沒有發(fā)現(xiàn)SYSTEM的消息輸入長度是固定的,然而POSIX的是可變長的。
?
SYSTEM的消息隊列函數(shù)由msgget、msgctl、msgsnd、msgrcv四個函數(shù)組成。下面的表格列出了這四個函數(shù)的函數(shù)原型及其具體說明。
1. ? msgget函數(shù)原型
| msgget(得到消息隊列標識符或創(chuàng)建一個消息隊列對象) | ||
| 所需頭文件 | #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> | |
| 函數(shù)說明 | 得到消息隊列標識符或創(chuàng)建一個消息隊列對象并返回消息隊列標識符 | |
| 函數(shù)原型 | int msgget(key_t key, int msgflg) | |
| 函數(shù)傳入值 | key | 0(IPC_PRIVATE):會建立新的消息隊列 |
| 大于0的32位整數(shù):視參數(shù)msgflg來確定操作。通常要求此值來源于ftok返回的IPC鍵值 | ||
| msgflg | 0:取消息隊列標識符,若不存在則函數(shù)會報錯 | |
| IPC_CREAT:當msgflg&IPC_CREAT為真時,如果內(nèi)核中不存在鍵值與key相等的消息隊列,則新建一個消息隊列;如果存在這樣的消息隊列,返回此消息隊列的標識符 | ||
| IPC_CREAT|IPC_EXCL:如果內(nèi)核中不存在鍵值與key相等的消息隊列,則新建一個消息隊列;如果存在這樣的消息隊列則報錯 | ||
| 函數(shù)返回值 | 成功:返回消息隊列的標識符 | |
| 出錯:-1,錯誤原因存于error中 | ||
| 附加說明 | 上述msgflg參數(shù)為模式標志參數(shù),使用時需要與IPC對象存取權(quán)限(如0600)進行|運算來確定消息隊列的存取權(quán)限 | |
| 錯誤代碼 | EACCES:指定的消息隊列已存在,但調(diào)用進程沒有權(quán)限訪問它 EEXIST:key指定的消息隊列已存在,而msgflg中同時指定IPC_CREAT和IPC_EXCL標志 ENOENT:key指定的消息隊列不存在同時msgflg中沒有指定IPC_CREAT標志 ENOMEM:需要建立消息隊列,但內(nèi)存不足 ENOSPC:需要建立消息隊列,但已達到系統(tǒng)的限制 | |
如果用msgget創(chuàng)建了一個新的消息隊列對象時,則msqid_ds結(jié)構(gòu)成員變量的值設(shè)置如下:
???????? msg_qnum、msg_lspid、msg_lrpid、 msg_stime、msg_rtime設(shè)置為0。
???????? msg_ctime設(shè)置為當前時間。
???????? msg_qbytes設(shè)成系統(tǒng)的限制值。
???????? msgflg的讀寫權(quán)限寫入msg_perm.mode中。
???????? msg_perm結(jié)構(gòu)的uid和cuid成員被設(shè)置成當前進程的有效用戶ID,gid和cuid成員被設(shè)置成當前進程的有效組ID。
2. ? msgctl函數(shù)原型
| msgctl (獲取和設(shè)置消息隊列的屬性) | ||
| 所需頭文件 | #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> | |
| 函數(shù)說明 | 獲取和設(shè)置消息隊列的屬性 | |
| 函數(shù)原型 | int msgctl(int msqid, int cmd, struct msqid_ds *buf) | |
| 函數(shù)傳入值 | msqid | 消息隊列標識符 |
| cmd ? | IPC_STAT:獲得msgid的消息隊列頭數(shù)據(jù)到buf中 | |
| IPC_SET:設(shè)置消息隊列的屬性,要設(shè)置的屬性需先存儲在buf中,可設(shè)置的屬性包括:msg_perm.uid、msg_perm.gid、msg_perm.mode以及msg_qbytes | ||
| buf:消息隊列管理結(jié)構(gòu)體,請參見消息隊列內(nèi)核結(jié)構(gòu)說明部分 | ||
| 函數(shù)返回值 | 成功:0 | |
| 出錯:-1,錯誤原因存于error中 | ||
| 錯誤代碼 | EACCESS:參數(shù)cmd為IPC_STAT,確無權(quán)限讀取該消息隊列 EFAULT:參數(shù)buf指向無效的內(nèi)存地址 EIDRM:標識符為msqid的消息隊列已被刪除 EINVAL:無效的參數(shù)cmd或msqid EPERM:參數(shù)cmd為IPC_SET或IPC_RMID,卻無足夠的權(quán)限執(zhí)行 | |
3. ? msgsnd函數(shù)原型
| msgsnd (將消息寫入到消息隊列) | ||
| 所需頭文件 | #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> | |
| 函數(shù)說明 | 將msgp消息寫入到標識符為msqid的消息隊列 | |
| 函數(shù)原型 | int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg) | |
| 函數(shù)傳入值 | msqid | 消息隊列標識符 |
| msgp | 發(fā)送給隊列的消息。msgp可以是任何類型的結(jié)構(gòu)體,但第一個字段必須為long類型,即表明此發(fā)送消息的類型,msgrcv根據(jù)此接收消息。msgp定義的參照格式如下: ? ??struct s_msg{ /*msgp定義的參照格式*/ | |
| msgsz | 要發(fā)送消息的大小,不含消息類型占用的4個字節(jié),即mtext的長度 | |
| msgflg | 0:當消息隊列滿時,msgsnd將會阻塞,直到消息能寫進消息隊列 | |
| IPC_NOWAIT:當消息隊列已滿的時候,msgsnd函數(shù)不等待立即返回 | ||
| IPC_NOERROR:若發(fā)送的消息大于size字節(jié),則把該消息截斷,截斷部分將被丟棄,且不通知發(fā)送進程。 | ||
| 函數(shù)返回值 | 成功:0 | |
| 出錯:-1,錯誤原因存于error中 | ||
| 錯誤代碼 | EAGAIN:參數(shù)msgflg設(shè)為IPC_NOWAIT,而消息隊列已滿 EIDRM:標識符為msqid的消息隊列已被刪除 EACCESS:無權(quán)限寫入消息隊列 EFAULT:參數(shù)msgp指向無效的內(nèi)存地址 EINTR:隊列已滿而處于等待情況下被信號中斷 EINVAL:無效的參數(shù)msqid、msgsz或參數(shù)消息類型type小于0 | |
?? msgsnd()為阻塞函數(shù),當消息隊列容量滿或消息個數(shù)滿會阻塞。消息隊列已被刪除,則返回EIDRM錯誤;被信號中斷返回E_INTR錯誤。
?如果設(shè)置IPC_NOWAIT消息隊列滿或個數(shù)滿時會返回-1,并且置EAGAIN錯誤。
msgsnd()解除阻塞的條件有以下三個條件:
①??? 不滿足消息隊列滿或個數(shù)滿兩個條件,即消息隊列中有容納該消息的空間。
②??? msqid代表的消息隊列被刪除。
③??? 調(diào)用msgsnd函數(shù)的進程被信號中斷。
4. ? msgrcv函數(shù)原型
| msgrcv (從消息隊列讀取消息) | ||
| 所需頭文件 | #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> | |
| 函數(shù)說明 | 從標識符為msqid的消息隊列讀取消息并存于msgp中,讀取后把此消息從消息隊列中刪除 | |
| 函數(shù)原型 | ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, ????????????????????? int msgflg); | |
| 函數(shù)傳入值 | msqid | 消息隊列標識符 |
| msgp | 存放消息的結(jié)構(gòu)體,結(jié)構(gòu)體類型要與msgsnd函數(shù)發(fā)送的類型相同 | |
| msgsz | 要接收消息的大小,不含消息類型占用的4個字節(jié) | |
| msgtyp | 0:接收第一個消息 | |
| >0:接收類型等于msgtyp的第一個消息 | ||
| <0:接收類型等于或者小于msgtyp絕對值的第一個消息 | ||
| msgflg | 0: 阻塞式接收消息,沒有該類型的消息msgrcv函數(shù)一直阻塞等待 | |
| IPC_NOWAIT:如果沒有返回條件的消息調(diào)用立即返回,此時錯誤碼為ENOMSG | ||
| IPC_EXCEPT:與msgtype配合使用返回隊列中第一個類型不為msgtype的消息 | ||
| IPC_NOERROR:如果隊列中滿足條件的消息內(nèi)容大于所請求的size字節(jié),則把該消息截斷,截斷部分將被丟棄 | ||
| 函數(shù)返回值 | 成功:實際讀取到的消息數(shù)據(jù)長度 | |
| 出錯:-1,錯誤原因存于error中 | ||
| 錯誤代碼 | E2BIG:消息數(shù)據(jù)長度大于msgsz而msgflag沒有設(shè)置IPC_NOERROR EIDRM:標識符為msqid的消息隊列已被刪除 EACCESS:無權(quán)限讀取該消息隊列 EFAULT:參數(shù)msgp指向無效的內(nèi)存地址 ENOMSG:參數(shù)msgflg設(shè)為IPC_NOWAIT,而消息隊列中無消息可讀 EINTR:等待讀取隊列內(nèi)的消息情況下被信號中斷 | |
msgrcv()解除阻塞的條件有以下三個:
①??? 消息隊列中有了滿足條件的消息。
②??? msqid代表的消息隊列被刪除。
③??? 調(diào)用msgrcv()的進程被信號中斷。
?
消息隊列使用程序范例
?
總結(jié)
以上是生活随笔為你收集整理的POSIX和SYSTEM的消息队列应该注意的问题的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 最近准备学习下mongodb(一 Win
- 下一篇: HistCite 的使用方法