POSIX 消息队列函数(mq_open、mq_getattr、mq_send、mq_receive)示例
(2016-03-17 16:52:46)需要注意以下几点:
1、消息队列的名字只能以一个 '/'开头,名字中不能包含其他的'/'
2、mq_receive() 的第三个参数表示读取消息的长度,不能小于能写入队列中消息的最大大小,即一定要大于等于该队列的
mq_attr 结构中 mq_msgsize 的大小。
3、消息的优先级:它是一个小于 MQ_PRIO_MAX 的数,数值越大,优先级越高。 POSIX 消息队列在调用
mq_receive 时总是返回队列中最高优先级的最早消息。如果消息不需要设定优先级,那么可以在 mq_send 是置
msg_prio 为 0, mq_receive 的 msg_prio 置为 NULL。
4、默认情况下mq_send和mq_receive是阻塞进行调用,可以通过mq_setattr来设置为O_NONBLOCK,如:
struct mq_attr new_attr;
mq_getattr(mqID, &new_attr);//获取当前属性
new_attr.mq_flags = O_NONBLOCK;//设置为非阻塞
mq_setattr(mqID, &new_attr, NULL)//设置属性
代码如下:
mq_posix_send:
#include "unistd.h"
#include "stdlib.h"
#include "stdio.h"
#include "string.h"
#include "mqueue.h"
#include "errno.h"
#include "pthread.h"
int thread_send_id;
int thread_redv_id;
pthread_t thread_send;
pthread_t thread_recv;
pthread_attr_t attr_t;
#define MAXNUM 30
int msgid = -1;
struct msg_st data_send;
struct msg_st data_recv;
#define MAX_TEXT 512
struct msg_st
{
int id_recv;//接收进程id
};
const char *progs[5]={"", "recv1", "recv2", "recv3",
"send"};
int running;
mqd_t mqID_send[3], mqID_recv;
struct mq_attr mq_attr_send, mq_attr_recv;
#define mqName_send "/send"
#define mqName_recv1 "/recv1"
#define mqName_recv2 "/recv2"
#define mqName_recv3 "/recv3"
//发送
void *send()
{
int sendNum = 0;
while (running)
{
sendNum++;
data_send.id_recv = ((sendNum-1)%3)+1;//1~3循环
data_send.id_send = 4;
sprintf(data_send.text, "%s -> %s -- [%d]",
progs[data_send.id_send], progs[data_send.id_recv], sendNum);
if (sendNum > (MAXNUM-3))//给每个进程的最后一条消息
{
char tmptext[100];
memcpy(tmptext, data_send.text, 100);
sprintf(data_send.text, "%s...%s", tmptext, "end");
}
//向队列发送数据
if (mq_send(mqID_send[data_send.id_recv-1], (char
*)&data_send, sizeof(data_send), 0) < 0)
{
fprintf(stderr, "mq_send failed: %s\n",
strerror(errno));
exit(EXIT_FAILURE);
}
fprintf(stderr, "[send]%d: %d->%d, %s\n", sendNum,
data_send.id_send, data_send.id_recv, data_send.text);//打印发送
if (sendNum == MAXNUM)
{
break;
}
sleep(1);
}
pthread_exit((void *)0);
}
//接收
void *recv()
{
int recvNum = 0;
while (running)
{
if (mq_receive(mqID_recv, (char *)&data_recv,
mq_attr_recv.mq_msgsize, NULL) < 0)
{
fprintf(stderr, "mq_receive failed: %s\n",
strerror(errno));
exit(EXIT_FAILURE);
}
recvNum++;
fprintf(stderr, "[recv]%d: %d->%d, %s\n", recvNum,
data_recv.id_send, data_recv.id_recv, data_recv.text);//打印接收
if (recvNum == MAXNUM)
{
int rc = 0;
//关闭
rc = mq_close(mqID_recv);
if(0 != rc)
{
fprintf(stderr, "mq_close failed: %s\n",
strerror(errno));
exit(EXIT_FAILURE);
}
//删除
rc = mq_unlink(mqName_send);
if(0 != rc)
{
fprintf(stderr, "mq_close failed: %s\n",
strerror(errno));
exit(EXIT_FAILURE);
}
running = 0;
}
}
pthread_exit((void *)0);
}
int main()
{
//建立消息队列
mqID_send[0] = mq_open(mqName_recv1, O_WRONLY | O_CREAT, 0666,
NULL);//写
if (mqID_send[0] < 0)
{
fprintf(stderr, "mq_open %s failed: %s\n", mqName_recv1,
strerror(errno));
exit(EXIT_FAILURE);
}
mqID_send[1] = mq_open(mqName_recv2, O_WRONLY | O_CREAT, 0666,
NULL);//写
if (mqID_send[1] < 0)
{
fprintf(stderr, "mq_open %s failed: %s\n", mqName_recv2,
strerror(errno));
exit(EXIT_FAILURE);
}
mqID_send[2] = mq_open(mqName_recv3, O_WRONLY | O_CREAT, 0666,
NULL);//写
if (mqID_send[2] < 0)
{
fprintf(stderr, "mq_open %s failed: %s\n", mqName_recv3,
strerror(errno));
exit(EXIT_FAILURE);
}
mqID_recv = mq_open(mqName_send, O_RDONLY | O_CREAT, 0666,
NULL);//读
if (mqID_recv < 0)
{
fprintf(stderr, "mq_open %s failed: %s\n", mqName_send,
strerror(errno));
exit(EXIT_FAILURE);
}
//获得属性
mq_getattr(mqID_recv, &mq_attr_recv);
{
fprintf(stderr, "recv.mq_maxmsg=%ld\n",
mq_attr_recv.mq_maxmsg);//队列所允许的最大消息条数
fprintf(stderr, "recv.mq_msgsize=%ld\n",
mq_attr_recv.mq_msgsize);//每条消息的最大字节数
fprintf(stderr, "recv.mq_curmsgs=%ld\n",
mq_attr_recv.mq_curmsgs);//队列当前的消息条数
}
running = 1;
//发送
while((thread_send_id=pthread_create(&thread_send,
&attr_t, send, NULL))!=0)
{
sleep(1);
}
//接收
while((thread_redv_id=pthread_create(&thread_recv,
&attr_t, recv, NULL))!=0)
{
sleep(1);
}
while (running)
{
sleep(1);
}
exit(EXIT_SUCCESS);
}
mq_posix_recv1:
#include "unistd.h"
#include "stdlib.h"
#include "stdio.h"
#include "string.h"
#include "mqueue.h"
#include "errno.h"
#include "pthread.h"
#define MAXNUM 10
int msgid = -1;
struct msg_st data_send;
struct msg_st data_recv;
#define MAX_TEXT 512
struct msg_st
{
int id_recv;//接收进程id
};
const char *progs[5]={"", "recv1", "recv2", "recv3",
"send"};
mqd_t mqID_send, mqID_recv;
struct mq_attr mq_attr_send, mq_attr_recv;
#define mqName_send "/send"
#define mqName_recv1 "/recv1"
#define mqName_recv2 "/recv2"
#define mqName_recv3 "/recv3"
//接收发送消息
void dealMsg()
{
int recvNum = 0;
int sendNum = 0;
//从队列中获取消息,直到遇到end消息为止
while (recvNum < MAXNUM)
{
//接收
if (mq_receive(mqID_recv, (char *)&data_recv,
mq_attr_recv.mq_msgsize, NULL) < 0)
{
fprintf(stderr, "mq_receive failed: %s\n",
strerror(errno));
exit(EXIT_FAILURE);
}
recvNum++;
fprintf(stderr, "[recv]%d: %d->%d, %s\n", recvNum,
data_recv.id_send, data_recv.id_recv, data_recv.text);//打印接收
//发送
sendNum++;
data_send.id_recv = 4;
data_send.id_send = 1;
sprintf(data_send.text, "%s -> %s -- [%d]",
progs[data_send.id_send], progs[data_send.id_recv], sendNum);
if (recvNum == MAXNUM)
{
char tmptext[100];
memcpy(tmptext, data_send.text, 100);
sprintf(data_send.text, "%s...%s", tmptext, "end");
}
//向队列发送数据
if (mq_send(mqID_send, (char *)&data_send,
sizeof(data_send), 0) < 0)
{
fprintf(stderr, "mq_send failed: %s\n",
strerror(errno));
exit(EXIT_FAILURE);
}
fprintf(stderr, "[send]%d: %d->%d, %s\n", sendNum,
data_send.id_send, data_send.id_recv, data_send.text);//打印发送
//关闭和删除消息队列
if (recvNum == MAXNUM)
{
int rc = 0;
rc = mq_close(mqID_recv);
if(0 != rc)
{
fprintf(stderr, "mq_close failed: %s\n",
strerror(errno));
exit(EXIT_FAILURE);
}
rc = mq_unlink(mqName_recv1);
if(0 != rc)
{
fprintf(stderr, "mq_close failed: %s\n",
strerror(errno));
exit(EXIT_FAILURE);
}
}
}
}
int main()
{
//建立消息队列
mqID_send = mq_open(mqName_send, O_WRONLY | O_CREAT, 0666,
NULL);//写
if (mqID_send < 0)
{
fprintf(stderr, "mq_open %s failed: %s\n", mqName_recv2,
strerror(errno));
exit(EXIT_FAILURE);
}
mqID_recv = mq_open(mqName_recv1, O_RDONLY | O_CREAT, 0666,
NULL);//读
if (mqID_recv < 0)
{
fprintf(stderr, "mq_open %s failed: %s\n", mqName_recv1,
strerror(errno));
exit(EXIT_FAILURE);
}
//获得属性
mq_getattr(mqID_recv, &mq_attr_recv);
{
fprintf(stderr, "recv.mq_maxmsg=%ld\n",
mq_attr_recv.mq_maxmsg);//队列所允许的最大消息条数
fprintf(stderr, "recv.mq_msgsize=%ld\n",
mq_attr_recv.mq_msgsize);//每条消息的最大字节数
fprintf(stderr, "recv.mq_curmsgs=%ld\n",
mq_attr_recv.mq_curmsgs);//队列当前的消息条数
}
dealMsg();
exit(EXIT_SUCCESS);
}
mq_posix_recv2:
#include "unistd.h"
#include "stdlib.h"
#include "stdio.h"
#include "string.h"
#include "mqueue.h"
#include "errno.h"
#include "pthread.h"
#define MAXNUM 10
int msgid = -1;
struct msg_st data_send;
struct msg_st data_recv;
#define MAX_TEXT 512
struct msg_st
{
int id_recv;//接收进程id
};
const char *progs[5]={"", "recv1", "recv2", "recv3",
"send"};
mqd_t mqID_send, mqID_recv;
struct mq_attr mq_attr_send, mq_attr_recv;
#define mqName_send "/send"
#define mqName_recv1 "/recv1"
#define mqName_recv2 "/recv2"
#define mqName_recv3 "/recv3"
//接收发送消息
void dealMsg()
{
int recvNum = 0;
int sendNum = 0;
//从队列中获取消息,直到遇到end消息为止
while (recvNum < MAXNUM)
{
//接收
if (mq_receive(mqID_recv, (char *)&data_recv,
mq_attr_recv.mq_msgsize, NULL) < 0)
{
fprintf(stderr, "mq_receive failed: %s\n",
strerror(errno));
exit(EXIT_FAILURE);
}
recvNum++;
fprintf(stderr, "[recv]%d: %d->%d, %s\n", recvNum,
data_recv.id_send, data_recv.id_recv, data_recv.text);//打印接收
//发送
sendNum++;
data_send.id_recv = 4;
data_send.id_send = 2;
sprintf(data_send.text, "%s -> %s -- [%d]",
progs[data_send.id_send], progs[data_send.id_recv], sendNum);
if (recvNum == MAXNUM)
{
char tmptext[100];
memcpy(tmptext, data_send.text, 100);
sprintf(data_send.text, "%s...%s", tmptext, "end");
}
//向队列发送数据
if (mq_send(mqID_send, (char *)&data_send,
sizeof(data_send), 0) < 0)
{
fprintf(stderr, "mq_send failed: %s\n",
strerror(errno));
exit(EXIT_FAILURE);
}
fprintf(stderr, "[send]%d: %d->%d, %s\n", sendNum,
data_send.id_send, data_send.id_recv, data_send.text);//打印发送
//关闭和删除消息队列
if (recvNum == MAXNUM)
{
int rc = 0;
rc = mq_close(mqID_recv);
if(0 != rc)
{
fprintf(stderr, "mq_close failed: %s\n",
strerror(errno));
exit(EXIT_FAILURE);
}
rc = mq_unlink(mqName_recv2);
if(0 != rc)
{
fprintf(stderr, "mq_close failed: %s\n",
strerror(errno));
exit(EXIT_FAILURE);
}
}
}
}
int main()
{
//建立消息队列
mqID_send = mq_open(mqName_send, O_WRONLY | O_CREAT, 0666,
NULL);//写
if (mqID_send < 0)
{
fprintf(stderr, "mq_open %s failed: %s\n", mqName_recv2,
strerror(errno));
exit(EXIT_FAILURE);
}
mqID_recv = mq_open(mqName_recv2, O_RDONLY | O_CREAT, 0666,
NULL);//读
if (mqID_recv < 0)
{
fprintf(stderr, "mq_open %s failed: %s\n", mqName_recv1,
strerror(errno));
exit(EXIT_FAILURE);
}
//获得属性
mq_getattr(mqID_recv, &mq_attr_recv);
{
fprintf(stderr, "recv.mq_maxmsg=%ld\n",
mq_attr_recv.mq_maxmsg);//队列所允许的最大消息条数
fprintf(stderr, "recv.mq_msgsize=%ld\n",
mq_attr_recv.mq_msgsize);//每条消息的最大字节数
fprintf(stderr, "recv.mq_curmsgs=%ld\n",
mq_attr_recv.mq_curmsgs);//队列当前的消息条数
}
dealMsg();
exit(EXIT_SUCCESS);
}
mq_posix_recv3:
#include "unistd.h"
#include "stdlib.h"
#include "stdio.h"
#include "string.h"
#include "mqueue.h"
#include "errno.h"
#include "pthread.h"
#define MAXNUM 10
int msgid = -1;
struct msg_st data_send;
struct msg_st data_recv;
#define MAX_TEXT 512
struct msg_st
{
int id_recv;//接收进程id
};
const char *progs[5]={"", "recv1", "recv2", "recv3",
"send"};
mqd_t mqID_send, mqID_recv;
struct mq_attr mq_attr_send, mq_attr_recv;
#define mqName_send "/send"
#define mqName_recv1 "/recv1"
#define mqName_recv2 "/recv2"
#define mqName_recv3 "/recv3"
//接收发送消息
void dealMsg()
{
int recvNum = 0;
int sendNum = 0;
//从队列中获取消息,直到遇到end消息为止
while (recvNum < MAXNUM)
{
//接收
if (mq_receive(mqID_recv, (char *)&data_recv,
mq_attr_recv.mq_msgsize, NULL) < 0)
{
fprintf(stderr, "mq_receive failed: %s\n",
strerror(errno));
exit(EXIT_FAILURE);
}
recvNum++;
fprintf(stderr, "[recv]%d: %d->%d, %s\n", recvNum,
data_recv.id_send, data_recv.id_recv, data_recv.text);//打印接收
//发送
sendNum++;
data_send.id_recv = 4;
data_send.id_send = 3;
sprintf(data_send.text, "%s -> %s -- [%d]",
progs[data_send.id_send], progs[data_send.id_recv], sendNum);
if (recvNum == MAXNUM)
{
char tmptext[100];
memcpy(tmptext, data_send.text, 100);
sprintf(data_send.text, "%s...%s", tmptext, "end");
}
//向队列发送数据
if (mq_send(mqID_send, (char *)&data_send,
sizeof(data_send), 0) < 0)
{
fprintf(stderr, "mq_send failed: %s\n",
strerror(errno));
exit(EXIT_FAILURE);
}
fprintf(stderr, "[send]%d: %d->%d, %s\n", sendNum,
data_send.id_send, data_send.id_recv, data_send.text);//打印发送
//关闭和删除消息队列
if (recvNum == MAXNUM)
{
int rc = 0;
rc = mq_close(mqID_recv);
if(0 != rc)
{
fprintf(stderr, "mq_close failed: %s\n",
strerror(errno));
exit(EXIT_FAILURE);
}
rc = mq_unlink(mqName_recv3);
if(0 != rc)
{
fprintf(stderr, "mq_close failed: %s\n",
strerror(errno));
exit(EXIT_FAILURE);
}
}
}
}
int main()
{
//建立消息队列
mqID_send = mq_open(mqName_send, O_WRONLY | O_CREAT, 0666,
NULL);//写
if (mqID_send < 0)
{
fprintf(stderr, "mq_open %s failed: %s\n", mqName_recv2,
strerror(errno));
exit(EXIT_FAILURE);
}
mqID_recv = mq_open(mqName_recv3, O_RDONLY | O_CREAT, 0666,
NULL);//读
if (mqID_recv < 0)
{
fprintf(stderr, "mq_open %s failed: %s\n", mqName_recv1,
strerror(errno));
exit(EXIT_FAILURE);
}
//获得属性
mq_getattr(mqID_recv, &mq_attr_recv);
{
fprintf(stderr, "recv.mq_maxmsg=%ld\n",
mq_attr_recv.mq_maxmsg);//队列所允许的最大消息条数
fprintf(stderr, "recv.mq_msgsize=%ld\n",
mq_attr_recv.mq_msgsize);//每条消息的最大字节数
fprintf(stderr, "recv.mq_curmsgs=%ld\n",
mq_attr_recv.mq_curmsgs);//队列当前的消息条数
}
dealMsg();
exit(EXIT_SUCCESS);
}
前一篇:system V消息队列相关问题
后一篇:POSIX 消息队列相关问题