POSIX 消息隊列
POSIX消息隊列與System V消息隊列的主要區別:
1.對POSIX隊列的讀總數返回最高優先級到最早消息,對SV隊列到讀則可以返回任意指定優先級的消息
2.當往一個空隊列放置一個消息時,POSIX允許產生一個信號或啟動一個線程,System V不提供此機制
消息的屬性:
1.一個無符號整數的優先級(POSIX)或一個長整數的類型(SV)
2.消息的數據部分長度(可以為0)
3.數據本身(如果長度大于0)
POSIX消息隊列總結:
mq_open創建一個新隊列或者打開一個已經存在的隊列
mq_close關閉隊列
mq_unlink刪除隊列名,刪除隊列
mq_send往隊列放置消息
mq_receive從一個隊列中讀出消息
mq_setattr和mq_getattr查詢和設置隊列的屬性
mq_notify允許注冊一個信號或者線程,在有一個消息被放置到空隊列時,發送信號或者激活線程
每個消息被賦予一個小整數優先級,mq_receive總是返回最高優先級的最早消息
限制:
/proc/sys/fs/mqueue/msg_max 10
/proc/sys/fs/mqueue/msgsize_max 8192
/proc/sys/fs/mqueue/queues_max 256
創建一個新的消息隊列或者打開一個已經存在的消息隊列
<mqueue.h> 注意:編譯加-lrt
<fcntl.h>
<sys/stat.h>
mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
成功返回描述字,失敗返回-1并設置errno
name: 必須為/開頭!!!
oflag: O_RDONLY, O_WRONLY, O_RDWR, O_CREAT, O_EXCL, O_NONBLOCK
關閉消息隊列,但不能刪除它
mqd_t mq_close(mqd_t mqdes);
成功返回0,失敗返回-1
刪除消息隊列,不一定馬上刪除消息隊列,但隊列名會立即刪除
mqd_t mq_unlink(const char *name);
成功返回0,失敗返回-1
當某個進程還沒有關閉此消息隊列時,調用mq_unlink時,不會馬上刪除隊列,當最后一個進程關閉隊列時,該隊列被刪除
int flags;
mqd_t mqd;
flags = O_RDWR | O_CREAT | O_EXCL;
mqd = mq_open("/tmp.111", flags, 0644, NULL);
if (mqd == (mqd_t)-1) {
perror("mq_open");
return 1;
}
消息隊列的屬性
mq_getattr mq_setattr
mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);
mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);
成功返回0,失敗返回-1
struct mq_attr {
long mq_flags; /* Flags: 0 or O_NONBLOCK */
long mq_maxmsg; /* Max. # of messages on queue */
long mq_msgsize; /* Max. message size (bytes) */
long mq_curmsgs; /* # of messages currently in queue */
};
mq_setattr只能修改mq_flags屬性,maxmsg和msgsize在mq_open時設置
mqd_t mqd;
struct mq_attr attr;
mqd = mq_open(argv[1], O_RDONLY);
mq_getattr(mqd, &attr);
printf("maxmsg=%ld, msgsize=%ld, curmsgs=%ld\n",
attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);
mq_close(mqd);
收發消息
mq_send mq_receive
mq_receive返回隊列中最高優先級的最早消息,而且該優先級能隨該消息的內容及其長度一起返回
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);
成功返回消息的長度,消息的實際長度,不包括消息頭;失敗返回-1
msg_len指示msg_ptr的長度,必須大于等于mq_msgsize
如果msg_prio不為NULL,函數返回消息的優先級
如果隊列為空,調用將阻塞,如果隊列設置0_NONBLOCK,調用立即返回EAGAIN
// 向隊列加入一條消息
mqd_t mqd;
char *msg;
size_t len;
unsigned int prio;
len = 100;
prio = 5;
mqd = mq_open("/abc.123", O_WRONLY);
msg = (char *)malloc(len);
memset(msg, 0, len);
mq_send(mqd, msg, len, prio);
// 從隊列讀入一條消息
mqd_t mqd;
char *msg;
size_t len;
int n;
unsigned int prio;
struct mq_attr attr;
mqd = mq_open("/abc.123", O_RDONLY);
mq_getattr(mqd, &attr);
len = attr.mq_msgsize;
msg = (char *)malloc(len);
memset(msg, 0, len);
n = mq_receive(mqd, msg, len, &prio);
printf("read %ld bytes, priority=%u\n", (long)n, prio);
隊列限制
long int open_max = sysconf(_SC_MQ_OPEN_MAX); // -1
long int prio_max = sysconf(_SC_MQ_PRIO_MAX); // 32768
消息通告
當往空隊列放置了一個消息時,通知進程
通告方式有2種:
1. 產生一個信號
2. 創建一個線程執行一個指定的函數
mqd_t mq_notify(mqd_t mqdes, const struct sigevent *notification);
成功返回0;失敗返回-1
給隊列建立或者刪除異步事件通知
1.如果notification非空,那么當前進程希望在有一個消息到達而且隊列先前為空時得到通知,該進程被注冊為接收該隊列的通知
2.如果notification為空,而且當前進程目前被注冊為接收該隊列的通知,那么現有注冊將被撤銷
3.任意時刻只有一個進程可以被注冊為接收隊列的通知
4.當有一個消息到達一個空隊列,而且已經有一個進程被注冊為接收該隊列的通知時,只有在沒有任何線程阻塞在該隊列的mq_receive調用的前提下,通知才會發送。即在mq_receive調用中的阻塞比任何通知的注冊都優先
5.當該通知已經發送給它的注冊進程時,其注冊即被撤銷。該進程必須再次調用mq_notify以重新注冊
6.當調用mq_notify但是隊列不為空時,通知不會發送;當隊列變為空,并且有一個消息入隊時,才發送通知
union sigval { /* Data passed with notification */
int sival_int; /* Integer value */
void *sival_ptr; /* Pointer value */
};
struct sigevent {
int sigev_notify; /* Notification method */
int sigev_signo; /* Notification signal */
union sigval sigev_value; /* Data passed with notification */
void (*sigev_notify_function) (union sigval);
/* Function for thread notification */
void *sigev_notify_attributes;
/* Thread function attributes */
};
sigev_notify:SIGEV_NONE,SIGEV_SIGNAL,SIGEV_THREAD
// 使用非阻塞mq_receive的信號通知
volatile sig_atomic_t mqflag;
static void sig_usr1(int);
int main(int argc, char *argv[])
{
mqd_t mqd;
void *buf;
ssize_t n;
sigset_t zeromask, newmask, oldmask;
struct mq_attr attr;
struct sigevent sigev;
assert(argc == 2);
mqd = mq_open(argv[1], O_RDONLY|O_NONBLOCK);
mq_getattr(mqd, &attr);
buf = malloc(attr.mq_msgsize);
sigemptyset(&zeromask);
sigemptyset(&newmask);
sigemptyset(&oldmask);
sigaddset(&newmask, SIGUSR1);
signal(SIGUSR1, sig_usr1);
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
mq_notify(mqd, &sigev);
for ( ; ; ) {
sigprocmask(SIG_BLOCK, &newmask, &oldmask);
while (mqflag == 0)
sigsuspend(&zeromask);
mqflag = 0;
mq_notify(mqd, &sigev);
while ((n = mq_receive(mqd, buf, attr.mq_msgsize, NULL)) >= 0) {
printf("read %ld bytes\n", (long)n);
}
if (errno != EAGAIN)
die("mq_receive");
sigprocmask(SIG_UNBLOCK, &newmask, NULL);
}
return 0;
}
static void sig_usr1(int signo)
{
mqflag = 1;
return;
}
// 使用sigwait代替信號處理程序的信號通知
#include <signal.h>
int sigwait(const sigset_t *set, int *sig);
成功返回0,并設置sig為收到的信號;失敗返回錯誤碼
int main(...)
{
...
sigemptyset(&newmask);
sigaddset(&newmask, SIGUSR1);
sigprocmask(SIGBLOCK, &newmask, NULL);
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
mq_notify(mqd, &sigev);
for ( ; ; ) {
sigwait(&newmask, &signo);
if (signo == SIGUSR1) {
mq_notify(mqd, &sigev);
while ((n = mq_receive(mqd, buf, len, NULL)) >=0) {
printf("read %ld bytes\n", n);
}
if (errno != EAGAIN)
die("mq_receive");
}
}
...
}
// 使用select的POSIX消息隊列
int pfds[2];
static void sig_usr1(int);
int main(int argc, char *arg[])
{
int fds;
char c;
fd_set rfds;
mqd_t mqd;
void *buf;
ssize_t n;
size_t len;
struct mq_attr attr;
struct sigevent sigev;
asset(argc == 2);
mqd = mq_open(argv[1], O_RDONLY|O_NONBLOCK);
mq_getattr(mqd, &attr);
len = attr.mq_msgsize;
buf = malloc(len);
pipe(pfds);
// 設置信號處理程序,建立通知
signal(SIGUSR1, sig_usr1);
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
mq_notify(mqd, &sigev);
FD_ZERO(&rfds);
for ( ; ; ) {
FD_SET(pfds[0], &rfds);
nfds = select(pfds[0]+1, &rfds, NULL, NULL, NULL);
if (FD_ISSET(pfds[0], &rfds)) { // 管道可讀
read(pfds[0], &c, 1);
mq_notify(mqd, &sigev);
while ((n = mq_receive(mqd, buf, len, NULL)) >= 0) {
printf("read %ld bytes\n", (long)n);
}
if (errno != EAGAIN)
die("mq_receive");
}
}
return 0;
}
static void sig_usr1(int signo)
{
write(pfds[1], "", 1); // 異步信號處理安全的函數
return;
}
// 收到通知后,啟動一個線程,接收消息,然后結束進程
#include <pthread.h>
#include <mqueue.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#define die(msg) { perror(msg); exit(EXIT_FAILURE); }
static void tfunc(union sigval sv) /* Thread start function */
{
struct mq_attr attr;
ssize_t nr;
void *buf;
mqd_t mqdes = *((mqd_t *) sv.sival_ptr);
/* Determine max. msg size; allocate buffer to receive msg */
if (mq_getattr(mqdes, &attr) == -1) die("mq_getattr");
buf = malloc(attr.mq_msgsize);
if (buf == NULL) die("malloc");
nr = mq_receive(mqdes, buf, attr.mq_msgsize, NULL);
if (nr == -1) die("mq_receive");
printf("Read %ld bytes from MQ0\n", (long) nr);
free(buf);
exit(EXIT_SUCCESS); /* Terminate the process */
}
int main(int argc, char *argv[])
{
mqd_t mqdes;
struct sigevent not;
assert(argc == 2);
mqdes = mq_open(argv[1], O_RDONLY);
if (mqdes == (mqd_t) -1) die("mq_open");
not.sigev_notify = SIGEV_THREAD;
not.sigev_notify_function = tfunc;
not.sigev_notify_attributes = NULL;
not.sigev_value.sival_ptr = &mqdes; /* Arg. to thread func. */
if (mq_notify(mqdes, ¬) == -1) die("mq_notify");
pause(); /* Process will be terminated by thread function */
return 0;
}
// 啟動一個新線程
mqd_t mqd;
struct mq_attr attr;
struct sigevent sigev;
static void notify_thread(union sigval);
int main(int argc, char *argv[])
{
assert(argc == 2);
mqd = mq_open(argv[1], O_RDONLY|O_NONBLOCK);
mq_getattr(mqd, &attr);
sigev.sigev_notify = SIGEV_THREAD;
sigev.sigev_value.sival_ptr = NULL;
sigev.sigev_notify_function = notify_thread;
sigev.sigev_notify_attributes = NULL;
mq_notify(mqd, &sigev);
for ( ; ; )
pause();
return 0;
}
static void notify_thread(union sigval arg)
{
ssize_t n;
size_t len;
void *buf;
len = attr.mq_msgsize;
printf("notify_thread started\n");
buf = malloc(len);
mq_notify(mqd, &sigev);
while ((n = mq_receive(mqd, buf, len, NULL)) >= 0) {
printf("read %ld bytes\n", (long)n);
}
if (errno != EAGAIN)
die("mq_receive");
free(buf);
pthread_exit(NULL);
}
POSIX實時信號
unix信號分為兩大組:
實時信號:SIGRTMIN--SIGRTMAX
其他信號:SIGINT, SIGQUIT, SIGKILL, ...
信號的實時行為取決于SA_SIGINFO
實時行為包含以下特征:
1.信號是排隊的,即如果一個信號產生了3次,它就遞交3次。以FIFO的順序排隊
2.當有多種SIGRTMIN到SIGRTMAX范圍內的解阻塞信號排隊時,值較小的信號先于值較大的信號遞交(注意:linux與此相反)
3.當某個非實時信號遞交時,傳遞給它的信號處理的唯一參數是該信號的值,實時信號比其他信號傳遞更多的信息
4.有些新函數使用實時信號工作,如sigqueue用來代替kill
// 查看實時信號的遞交順序
static void sig_rt(int, siginfo_t *, void *);
int main(void)
{
int i, j;
pid_t pid;
sigset_t newset;
union sigval val;
printf("SIGRTMIN=%d, SIGRTMAX=%d\n", (int)SIGRTMIN, (int)SIGRTMAX);
pid = fork();
if (pid < 0) die("fork");
else if (pid == 0) {
/* 阻塞3個實時信號 */
sigemptyset(&newset);
sigaddset(&newset, SIGRTMIN);
sigaddset(&newset, SIGRTMIN+1);
sigaddset(&newset, SIGRTMIN+2);
sigprocmask(SIG_BLOCK, &newset, NULL);
signal_rt(SIGRTMIN, sig_rt);
signal_rt(SIGRTMIN+1, sig_rt);
signal_rt(SIGRTMIN+2, sig_rt);
sleep(6);
sigprocmask(SIG_UNBLOCK, &newset, NULL);
sleep(3);
exit(0);
}
else {
sleep(3);
for (i=SIGRTMIN; i<=SIGRTMIN+2; i++) {
for (j=0; j<=2; j++) {
val.sival_int = j;
sigqueue(pid, i, val);
printf("send signal signo=%d, val=%d\n", i, j);
}
}
exit(0);
}
}
static void sig_rt(int signo, siginfo_t *info, void *context)
{
printf("receive signal signo=%d, code=%d, ival=%d\n",
signo, info->si_code, info->si_value.sival_int);
}
typedef void sigfunc_rt(int, siginfo_t *, void *);
sigfunc_rt *signal_rt(int signo, sigfunc_rt *func)
{
struct sigaction act, oact;
act.sa_sigaction = func;
sigemptyset(&act.sa_mask);
act.sa_flags = SA_SIGINFO; /* 實時信號必須指定 */
if (signo == SIGALRM) {
#ifdef SA_INTERRUPT
act.sa_flags |= SA_INTERRUPT;
#endif
}
else {
#ifdef SA_RESTART
act.sa_flags |= SA_RESTART;
#endif
}
if (sigaction(signo, &act, &oact) < 0)
return (sigfunc_rt *)SIG_ERR;
else
return oact.sa_sigaction;
}
輸出如下:
[root@jiangkun unp]# ./rtsig
SIGRTMIN=34, SIGRTMAX=64
send signal signo=34, val=0
send signal signo=34, val=1
send signal signo=34, val=2
send signal signo=35, val=0
send signal signo=35, val=1
send signal signo=35, val=2
send signal signo=36, val=0
send signal signo=36, val=1
send signal signo=36, val=2
receive signal signo=36, code=-1, ival=0
receive signal signo=36, code=-1, ival=1
receive signal signo=36, code=-1, ival=2
receive signal signo=35, code=-1, ival=0
receive signal signo=35, code=-1, ival=1
receive signal signo=35, code=-1, ival=2
receive signal signo=34, code=-1, ival=0
receive signal signo=34, code=-1, ival=1
receive signal signo=34, code=-1, ival=2
struct sigaction {
void (*sa_handler)(int);
void (*sa_sigaction)(int, siginfo_t *, void *);
sigset_t sa_mask;
int sa_flags;
void (*sa_restorer)(void); /* 被遺棄了! */
};
實時信號之所以是可靠的,因為在進程阻塞該信號的時間內,發給該進程的所有實時信號會排隊,而非實時信號則會合并為一個信號。早期的kill函數只能向特 定的進程發送一個特定的信號,并且早期的信號處理函數也不能接受附加數據。siqueue和sigaction解決了這個問題。
下面這個例子中,進程先屏蔽SIGINT和SIGRTMIN兩個信號,其中SIGINT是非實時信號,而SIGRTMIN為實時信號,接著進程睡眠,睡眠完成之后再接觸對這兩個信號的屏蔽,此時可以比較對兩種信號的處理方式是否一樣。
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
void sig_handler(int, siginfo_t*, void*);
int main(int argc,char *argv[])
{
struct sigaction act;
sigset_t newmask, oldmask;
int rc;
sigemptyset(&newmask);
/* 往信號集中添加一個非實時信號 */
sigaddset(&newmask, SIGINT);
/* 往信號集中添加一個實時信號 */
sigaddset(&newmask, SIGRTMIN);
/* 屏蔽實時信號SIGRTMIN */
sigprocmask(SIG_BLOCK, &newmask, &oldmask);
act.sa_sigaction = sig_handler;
act.sa_flags = SA_SIGINFO;
if(sigaction(SIGINT, &act, NULL) < 0) {
printf("install signal error\n");
}
if(sigaction(SIGRTMIN, &act, NULL) < 0) {
printf("install signal error\n");
}
printf("pid = %d\n", getpid());
/* 進程睡眠,在此時間內的發給該進程的所有實時信號 將排隊,不會有信號丟失 */
sleep(20);
/* 解除對SIGRTMIN信號的屏蔽,信號處理函數將會被調用 */
sigprocmask(SIG_SETMASK, &oldmask, NULL);
return 0;
}
void sig_handler(int signo, siginfo_t *info, void *context)
{
if(signo == SIGINT)
printf("Got a common signal\n");
else
printf("Got a real time signal\n");
}
將程序編譯好之后,再開一個終端用于發送實時信號。
# ./sigqueue_receive
pid = 8871
進程開始睡眠……
在新的終端輸入:
ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871
ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871
ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871
ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871
連續發送四個SIGRTMIN,接著回到之前的終端,連續四次按下"ctrl+c"。
^C^C^C^C
最后進程終于醒來,整個輸出如下:
pid = 8871
^C^C^C^CGot a real time signal
Got a real time signal
Got a real time signal
Got a real time signal
Got a common signal
果然接受到四個實時信號,并且四次調用了信號處理函數,而對于SIGINT,雖然也按下了四次"ctrl+c",但是進程對其只做一次處理。這個例子中是先發實時信號后發非實時信號,所以信號處理函數先處理實
時信號,如果只是按照順序注冊信號的話,這很好理解,但是換一下,先按下了四次"ctrl+c"然后使用kill發四次實時信號,結果發現輸出的結果仍然 一樣,這說明實時信號的優先級比非實時信號要高,內核每個進程的信號組成一個雙向鏈表,實時信號插入的時候就不是隨便插在尾部了。
信號的優先級:信號實質上是軟中斷,中斷有優先級,信號也有優先級。如果一個進程有多個未決信號,則對于同一個未決的實時信號,內核將按照發送的順序來遞 交信號。如果存在多個未決的實時信號,則值(或者說編號)越大的越先被遞送。如果既存在不可靠信號,又存在可靠信號(實時信號),雖然POSIX對這一情 況沒有明確規定,但Linux系統和大多數遵循POSIX標準的操作系統一樣,將優先遞交可靠信號。一個進程如果處理 SIGQUIT(3),SIGINT(2),SIGHUP(1)(通過"kill -l" 可以查看信號的編號),那么先后給該進程發送SIGINT,SIGHUP,SIGQUIT,處理的順序會是SIGQUIT,SIGINT,SIGHUP, 不論改變這個三個信號的發送順序,處理的順序都是一樣的。