加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

POSIX 消息队列函数(mq_open、mq_getattr、mq_send、mq_receive)示例

(2016-03-17 16:52:46)

需要注意以下几点:
1、消息队列的名字只能以一个 '/'开头,名字中不能包含其他的'/'
2、mq_receive() 的第三个参数表示读取消息的长度,不能小于能写入队列中消息的最大大小,即一定要大于等于该队列的 mq_attr 结构中 mq_msgsize 的大小。
3、消息的优先级:它是一个小于 MQ_PRIO_MAX 的数,数值越大,优先级越高。 POSIX 消息队列在调用 mq_receive 时总是返回队列中最高优先级的最早消息。如果消息不需要设定优先级,那么可以在 mq_send 是置 msg_prio 为 0, mq_receive 的 msg_prio 置为 NULL。
4、默认情况下mq_sendmq_receive是阻塞进行调用,可以通过mq_setattr来设置为O_NONBLOCK,如:
struct mq_attr new_attr;
mq_getattr(mqID, &new_attr);//获取当前属性
new_attr.mq_flags = O_NONBLOCK;//设置为非阻塞
mq_setattr(mqID, &new_attr, NULL)//设置属性

代码如下:
mq_posix_send:
#include "unistd.h"
#include "stdlib.h"
#include "stdio.h"
#include "string.h"
#include "mqueue.h"
#include "errno.h"
#include "pthread.h"

int thread_send_id;
int thread_redv_id;
pthread_t thread_send;
pthread_t thread_recv;
pthread_attr_t attr_t;

#define MAXNUM 30
int msgid = -1;
struct msg_st data_send;
struct msg_st data_recv;

#define MAX_TEXT 512
struct msg_st
{
int id_recv;//接收进程id
    int id_send;//本进程id
    char text[MAX_TEXT];
};

const char *progs[5]={"", "recv1", "recv2", "recv3", "send"};
int running;

mqd_t mqID_send[3], mqID_recv;
struct mq_attr mq_attr_send, mq_attr_recv;

#define mqName_send "/send"
#define mqName_recv1 "/recv1"
#define mqName_recv2 "/recv2"
#define mqName_recv3 "/recv3"

//发送
void *send()
{
int sendNum = 0;
while (running)
{
sendNum++;
   //向消息队列中写消息,直到写入end
data_send.id_recv = ((sendNum-1)%3)+1;//1~3循环
data_send.id_send = 4;
sprintf(data_send.text, "%s -> %s -- [%d]", progs[data_send.id_send], progs[data_send.id_recv], sendNum);

if (sendNum > (MAXNUM-3))//给每个进程的最后一条消息
{
char tmptext[100];
memcpy(tmptext, data_send.text, 100);
sprintf(data_send.text, "%s...%s", tmptext, "end");
}

//向队列发送数据
if (mq_send(mqID_send[data_send.id_recv-1], (char *)&data_send, sizeof(data_send), 0) < 0)
{
fprintf(stderr, "mq_send failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
fprintf(stderr, "[send]%d: %d->%d, %s\n", sendNum, data_send.id_send, data_send.id_recv, data_send.text);//打印发送

if (sendNum == MAXNUM)
{
break;
}

sleep(1);
}
pthread_exit((void *)0);
}

//接收
void *recv()
{
int recvNum = 0;

    //从队列中获取消息,直到遇到end消息为止
while (running)
{
if (mq_receive(mqID_recv, (char *)&data_recv, mq_attr_recv.mq_msgsize, NULL) < 0)
{
fprintf(stderr, "mq_receive failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
recvNum++;
fprintf(stderr, "[recv]%d: %d->%d, %s\n", recvNum, data_recv.id_send, data_recv.id_recv, data_recv.text);//打印接收

if (recvNum == MAXNUM)
{
int rc = 0;
//关闭
rc = mq_close(mqID_recv);
if(0 != rc)
{
fprintf(stderr, "mq_close failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}

//删除
rc = mq_unlink(mqName_send);
if(0 != rc)
{
fprintf(stderr, "mq_close failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
running = 0;
}
}
pthread_exit((void *)0);
}

int main()
{
//建立消息队列
mqID_send[0] = mq_open(mqName_recv1, O_WRONLY | O_CREAT, 0666, NULL);//写
if (mqID_send[0] < 0)
{
fprintf(stderr, "mq_open %s failed: %s\n", mqName_recv1, strerror(errno));
exit(EXIT_FAILURE);
}

mqID_send[1] = mq_open(mqName_recv2, O_WRONLY | O_CREAT, 0666, NULL);//写
if (mqID_send[1] < 0)
{
fprintf(stderr, "mq_open %s failed: %s\n", mqName_recv2, strerror(errno));
exit(EXIT_FAILURE);
}

mqID_send[2] = mq_open(mqName_recv3, O_WRONLY | O_CREAT, 0666, NULL);//写
if (mqID_send[2] < 0)
{
fprintf(stderr, "mq_open %s failed: %s\n", mqName_recv3, strerror(errno));
exit(EXIT_FAILURE);
}

mqID_recv = mq_open(mqName_send, O_RDONLY | O_CREAT, 0666, NULL);//读
if (mqID_recv < 0)
{
fprintf(stderr, "mq_open %s failed: %s\n", mqName_send, strerror(errno));
exit(EXIT_FAILURE);
}

//获得属性
mq_getattr(mqID_recv, &mq_attr_recv);
{
fprintf(stderr, "recv.mq_maxmsg=%ld\n", mq_attr_recv.mq_maxmsg);//队列所允许的最大消息条数
fprintf(stderr, "recv.mq_msgsize=%ld\n", mq_attr_recv.mq_msgsize);//每条消息的最大字节数
fprintf(stderr, "recv.mq_curmsgs=%ld\n", mq_attr_recv.mq_curmsgs);//队列当前的消息条数
}

running = 1;

//发送
while((thread_send_id=pthread_create(&thread_send, &attr_t, send, NULL))!=0)
{
sleep(1);
}

//接收
while((thread_redv_id=pthread_create(&thread_recv, &attr_t, recv, NULL))!=0)
{
sleep(1);
}

while (running)
{
sleep(1);
}
exit(EXIT_SUCCESS);
}

mq_posix_recv1:
#include "unistd.h"
#include "stdlib.h"
#include "stdio.h"
#include "string.h"
#include "mqueue.h"
#include "errno.h"
#include "pthread.h"


#define MAXNUM 10
int msgid = -1;
struct msg_st data_send;
struct msg_st data_recv;

#define MAX_TEXT 512
struct msg_st
{
int id_recv;//接收进程id
    int id_send;//本进程id
    char text[MAX_TEXT];
};

const char *progs[5]={"", "recv1", "recv2", "recv3", "send"};

mqd_t mqID_send, mqID_recv;
struct mq_attr mq_attr_send, mq_attr_recv;

#define mqName_send "/send"
#define mqName_recv1 "/recv1"
#define mqName_recv2 "/recv2"
#define mqName_recv3 "/recv3"

//接收发送消息
void dealMsg()
{
int recvNum = 0;
int sendNum = 0;
//从队列中获取消息,直到遇到end消息为止
while (recvNum < MAXNUM)
{
//接收
if (mq_receive(mqID_recv, (char *)&data_recv, mq_attr_recv.mq_msgsize, NULL) < 0)
{
fprintf(stderr, "mq_receive failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
recvNum++;
fprintf(stderr, "[recv]%d: %d->%d, %s\n", recvNum, data_recv.id_send, data_recv.id_recv, data_recv.text);//打印接收

//发送
sendNum++;
data_send.id_recv = 4;
data_send.id_send = 1;
sprintf(data_send.text, "%s -> %s -- [%d]", progs[data_send.id_send], progs[data_send.id_recv], sendNum);

if (recvNum == MAXNUM)
{
char tmptext[100];
memcpy(tmptext, data_send.text, 100);
sprintf(data_send.text, "%s...%s", tmptext, "end");
}
//向队列发送数据
if (mq_send(mqID_send, (char *)&data_send, sizeof(data_send), 0) < 0)
{
fprintf(stderr, "mq_send failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
fprintf(stderr, "[send]%d: %d->%d, %s\n", sendNum, data_send.id_send, data_send.id_recv, data_send.text);//打印发送

//关闭和删除消息队列
if (recvNum == MAXNUM)
{
int rc = 0;

rc = mq_close(mqID_recv);
if(0 != rc)
{
fprintf(stderr, "mq_close failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}

rc = mq_unlink(mqName_recv1);
if(0 != rc)
{
fprintf(stderr, "mq_close failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
}
}
}

int main()
{
//建立消息队列
mqID_send = mq_open(mqName_send, O_WRONLY | O_CREAT, 0666, NULL);//写
if (mqID_send < 0)
{
fprintf(stderr, "mq_open %s failed: %s\n", mqName_recv2, strerror(errno));
exit(EXIT_FAILURE);
}

mqID_recv = mq_open(mqName_recv1, O_RDONLY | O_CREAT, 0666, NULL);//读
if (mqID_recv < 0)
{
fprintf(stderr, "mq_open %s failed: %s\n", mqName_recv1, strerror(errno));
exit(EXIT_FAILURE);
}

//获得属性
mq_getattr(mqID_recv, &mq_attr_recv);
{
fprintf(stderr, "recv.mq_maxmsg=%ld\n", mq_attr_recv.mq_maxmsg);//队列所允许的最大消息条数
fprintf(stderr, "recv.mq_msgsize=%ld\n", mq_attr_recv.mq_msgsize);//每条消息的最大字节数
fprintf(stderr, "recv.mq_curmsgs=%ld\n", mq_attr_recv.mq_curmsgs);//队列当前的消息条数
}

dealMsg();
exit(EXIT_SUCCESS);
}

mq_posix_recv2:
#include "unistd.h"
#include "stdlib.h"
#include "stdio.h"
#include "string.h"
#include "mqueue.h"
#include "errno.h"
#include "pthread.h"


#define MAXNUM 10
int msgid = -1;
struct msg_st data_send;
struct msg_st data_recv;

#define MAX_TEXT 512
struct msg_st
{
int id_recv;//接收进程id
    int id_send;//本进程id
    char text[MAX_TEXT];
};

const char *progs[5]={"", "recv1", "recv2", "recv3", "send"};

mqd_t mqID_send, mqID_recv;
struct mq_attr mq_attr_send, mq_attr_recv;

#define mqName_send "/send"
#define mqName_recv1 "/recv1"
#define mqName_recv2 "/recv2"
#define mqName_recv3 "/recv3"

//接收发送消息
void dealMsg()
{
int recvNum = 0;
int sendNum = 0;
//从队列中获取消息,直到遇到end消息为止
while (recvNum < MAXNUM)
{
//接收
if (mq_receive(mqID_recv, (char *)&data_recv, mq_attr_recv.mq_msgsize, NULL) < 0)
{
fprintf(stderr, "mq_receive failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
recvNum++;
fprintf(stderr, "[recv]%d: %d->%d, %s\n", recvNum, data_recv.id_send, data_recv.id_recv, data_recv.text);//打印接收

//发送
sendNum++;
data_send.id_recv = 4;
data_send.id_send = 2;
sprintf(data_send.text, "%s -> %s -- [%d]", progs[data_send.id_send], progs[data_send.id_recv], sendNum);

if (recvNum == MAXNUM)
{
char tmptext[100];
memcpy(tmptext, data_send.text, 100);
sprintf(data_send.text, "%s...%s", tmptext, "end");
}
//向队列发送数据
if (mq_send(mqID_send, (char *)&data_send, sizeof(data_send), 0) < 0)
{
fprintf(stderr, "mq_send failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
fprintf(stderr, "[send]%d: %d->%d, %s\n", sendNum, data_send.id_send, data_send.id_recv, data_send.text);//打印发送

//关闭和删除消息队列
if (recvNum == MAXNUM)
{
int rc = 0;

rc = mq_close(mqID_recv);
if(0 != rc)
{
fprintf(stderr, "mq_close failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}

rc = mq_unlink(mqName_recv2);
if(0 != rc)
{
fprintf(stderr, "mq_close failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
}
}
}

int main()
{
//建立消息队列
mqID_send = mq_open(mqName_send, O_WRONLY | O_CREAT, 0666, NULL);//写
if (mqID_send < 0)
{
fprintf(stderr, "mq_open %s failed: %s\n", mqName_recv2, strerror(errno));
exit(EXIT_FAILURE);
}

mqID_recv = mq_open(mqName_recv2, O_RDONLY | O_CREAT, 0666, NULL);//读
if (mqID_recv < 0)
{
fprintf(stderr, "mq_open %s failed: %s\n", mqName_recv1, strerror(errno));
exit(EXIT_FAILURE);
}

//获得属性
mq_getattr(mqID_recv, &mq_attr_recv);
{
fprintf(stderr, "recv.mq_maxmsg=%ld\n", mq_attr_recv.mq_maxmsg);//队列所允许的最大消息条数
fprintf(stderr, "recv.mq_msgsize=%ld\n", mq_attr_recv.mq_msgsize);//每条消息的最大字节数
fprintf(stderr, "recv.mq_curmsgs=%ld\n", mq_attr_recv.mq_curmsgs);//队列当前的消息条数
}

dealMsg();
exit(EXIT_SUCCESS);
}

mq_posix_recv3:
#include "unistd.h"
#include "stdlib.h"
#include "stdio.h"
#include "string.h"
#include "mqueue.h"
#include "errno.h"
#include "pthread.h"


#define MAXNUM 10
int msgid = -1;
struct msg_st data_send;
struct msg_st data_recv;

#define MAX_TEXT 512
struct msg_st
{
int id_recv;//接收进程id
    int id_send;//本进程id
    char text[MAX_TEXT];
};

const char *progs[5]={"", "recv1", "recv2", "recv3", "send"};

mqd_t mqID_send, mqID_recv;
struct mq_attr mq_attr_send, mq_attr_recv;

#define mqName_send "/send"
#define mqName_recv1 "/recv1"
#define mqName_recv2 "/recv2"
#define mqName_recv3 "/recv3"

//接收发送消息
void dealMsg()
{
int recvNum = 0;
int sendNum = 0;
//从队列中获取消息,直到遇到end消息为止
while (recvNum < MAXNUM)
{
//接收
if (mq_receive(mqID_recv, (char *)&data_recv, mq_attr_recv.mq_msgsize, NULL) < 0)
{
fprintf(stderr, "mq_receive failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
recvNum++;
fprintf(stderr, "[recv]%d: %d->%d, %s\n", recvNum, data_recv.id_send, data_recv.id_recv, data_recv.text);//打印接收

//发送
sendNum++;
data_send.id_recv = 4;
data_send.id_send = 3;
sprintf(data_send.text, "%s -> %s -- [%d]", progs[data_send.id_send], progs[data_send.id_recv], sendNum);

if (recvNum == MAXNUM)
{
char tmptext[100];
memcpy(tmptext, data_send.text, 100);
sprintf(data_send.text, "%s...%s", tmptext, "end");
}
//向队列发送数据
if (mq_send(mqID_send, (char *)&data_send, sizeof(data_send), 0) < 0)
{
fprintf(stderr, "mq_send failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
fprintf(stderr, "[send]%d: %d->%d, %s\n", sendNum, data_send.id_send, data_send.id_recv, data_send.text);//打印发送

//关闭和删除消息队列
if (recvNum == MAXNUM)
{
int rc = 0;

rc = mq_close(mqID_recv);
if(0 != rc)
{
fprintf(stderr, "mq_close failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}

rc = mq_unlink(mqName_recv3);
if(0 != rc)
{
fprintf(stderr, "mq_close failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
}
}
}

int main()
{
//建立消息队列
mqID_send = mq_open(mqName_send, O_WRONLY | O_CREAT, 0666, NULL);//写
if (mqID_send < 0)
{
fprintf(stderr, "mq_open %s failed: %s\n", mqName_recv2, strerror(errno));
exit(EXIT_FAILURE);
}

mqID_recv = mq_open(mqName_recv3, O_RDONLY | O_CREAT, 0666, NULL);//读
if (mqID_recv < 0)
{
fprintf(stderr, "mq_open %s failed: %s\n", mqName_recv1, strerror(errno));
exit(EXIT_FAILURE);
}

//获得属性
mq_getattr(mqID_recv, &mq_attr_recv);
{
fprintf(stderr, "recv.mq_maxmsg=%ld\n", mq_attr_recv.mq_maxmsg);//队列所允许的最大消息条数
fprintf(stderr, "recv.mq_msgsize=%ld\n", mq_attr_recv.mq_msgsize);//每条消息的最大字节数
fprintf(stderr, "recv.mq_curmsgs=%ld\n", mq_attr_recv.mq_curmsgs);//队列当前的消息条数
}

dealMsg();
exit(EXIT_SUCCESS);
}

0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有