<menu id="w8yyk"><menu id="w8yyk"></menu></menu>
  • <dd id="w8yyk"><nav id="w8yyk"></nav></dd>
    <menu id="w8yyk"></menu>
    <menu id="w8yyk"><code id="w8yyk"></code></menu>
    <menu id="w8yyk"></menu>
    <xmp id="w8yyk">
    <xmp id="w8yyk"><nav id="w8yyk"></nav>
  • 網站首頁 > 物聯資訊 > 技術分享

    POSIX 消息隊列

    2016-09-28 00:00:00 廣州睿豐德信息科技有限公司 閱讀
    睿豐德科技 專注RFID識別技術和條碼識別技術與管理軟件的集成項目。質量追溯系統、MES系統、金蝶與條碼系統對接、用友與條碼系統對接

    POSIX消息隊列與System V消息隊列的主要區別:
    1.對POSIX隊列的讀總數返回最高優先級到最早消息,對SV隊列到讀則可以返回任意指定優先級的消息
    2.當往一個空隊列放置一個消息時,POSIX允許產生一個信號或啟動一個線程,System V不提供此機制

    消息的屬性:
    1.一個無符號整數的優先級(POSIX)或一個長整數的類型(SV)
    2.消息的數據部分長度(可以為0)
    3.數據本身(如果長度大于0)

    POSIX消息隊列總結:
    mq_open創建一個新隊列或者打開一個已經存在的隊列
    mq_close關閉隊列
    mq_unlink刪除隊列名,刪除隊列
    mq_send往隊列放置消息
    mq_receive從一個隊列中讀出消息
    mq_setattr和mq_getattr查詢和設置隊列的屬性
    mq_notify允許注冊一個信號或者線程,在有一個消息被放置到空隊列時,發送信號或者激活線程
    每個消息被賦予一個小整數優先級,mq_receive總是返回最高優先級的最早消息

    限制:
    /proc/sys/fs/mqueue/msg_max 10
    /proc/sys/fs/mqueue/msgsize_max 8192
    /proc/sys/fs/mqueue/queues_max 256

    創建一個新的消息隊列或者打開一個已經存在的消息隊列
    <mqueue.h> 注意:編譯加-lrt
    <fcntl.h>
    <sys/stat.h>
    mqd_t mq_open(const char *name, int oflag);
    mqd_t mq_open(const char *name, int oflag, mode_t mode,  struct mq_attr *attr);
    成功返回描述字,失敗返回-1并設置errno
    name: 必須為/開頭!!!
    oflag: O_RDONLY, O_WRONLY, O_RDWR, O_CREAT, O_EXCL, O_NONBLOCK

    關閉消息隊列,但不能刪除它
    mqd_t mq_close(mqd_t mqdes);
    成功返回0,失敗返回-1

    刪除消息隊列,不一定馬上刪除消息隊列,但隊列名會立即刪除
    mqd_t mq_unlink(const char *name);
    成功返回0,失敗返回-1
    當某個進程還沒有關閉此消息隊列時,調用mq_unlink時,不會馬上刪除隊列,當最后一個進程關閉隊列時,該隊列被刪除
    int flags;
    mqd_t mqd;
    flags = O_RDWR | O_CREAT | O_EXCL;
    mqd = mq_open("/tmp.111", flags, 0644, NULL);
    if (mqd == (mqd_t)-1) {
    perror("mq_open");
    return 1;
    }

    消息隊列的屬性
    mq_getattr mq_setattr
    mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);
    mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);
    成功返回0,失敗返回-1
    struct mq_attr {
    long mq_flags;       /* Flags: 0 or O_NONBLOCK */
    long mq_maxmsg;      /* Max. # of messages on queue */
    long mq_msgsize;     /* Max. message size (bytes) */
    long mq_curmsgs;     /* # of messages currently in queue */
    };
    mq_setattr只能修改mq_flags屬性,maxmsg和msgsize在mq_open時設置
    mqd_t mqd;
    struct mq_attr attr;
    mqd = mq_open(argv[1], O_RDONLY);
    mq_getattr(mqd, &attr);
    printf("maxmsg=%ld, msgsize=%ld, curmsgs=%ld\n",
    attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);
    mq_close(mqd);

    收發消息
    mq_send mq_receive
    mq_receive返回隊列中最高優先級的最早消息,而且該優先級能隨該消息的內容及其長度一起返回

    ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);
    成功返回消息的長度,消息的實際長度,不包括消息頭;失敗返回-1
    msg_len指示msg_ptr的長度,必須大于等于mq_msgsize
    如果msg_prio不為NULL,函數返回消息的優先級
    如果隊列為空,調用將阻塞,如果隊列設置0_NONBLOCK,調用立即返回EAGAIN

    // 向隊列加入一條消息
    mqd_t mqd;
    char *msg;
    size_t len;
    unsigned int prio;
    len = 100;
    prio = 5;
    mqd = mq_open("/abc.123", O_WRONLY);
    msg = (char *)malloc(len);
    memset(msg, 0, len);
    mq_send(mqd, msg, len, prio);

    // 從隊列讀入一條消息
    mqd_t mqd;
    char *msg;
    size_t len;
    int n;
    unsigned int prio;
    struct mq_attr attr;
    mqd = mq_open("/abc.123", O_RDONLY);
    mq_getattr(mqd, &attr);
    len = attr.mq_msgsize;
    msg = (char *)malloc(len);
    memset(msg, 0, len);
    n = mq_receive(mqd, msg, len, &prio);
    printf("read %ld bytes, priority=%u\n", (long)n, prio);

    隊列限制
    long int open_max = sysconf(_SC_MQ_OPEN_MAX);  // -1
    long int prio_max = sysconf(_SC_MQ_PRIO_MAX);  // 32768

    消息通告
    當往空隊列放置了一個消息時,通知進程
    通告方式有2種:
    1. 產生一個信號
    2. 創建一個線程執行一個指定的函數
    mqd_t mq_notify(mqd_t mqdes, const struct sigevent *notification);
    成功返回0;失敗返回-1
    給隊列建立或者刪除異步事件通知
    1.如果notification非空,那么當前進程希望在有一個消息到達而且隊列先前為空時得到通知,該進程被注冊為接收該隊列的通知
    2.如果notification為空,而且當前進程目前被注冊為接收該隊列的通知,那么現有注冊將被撤銷
    3.任意時刻只有一個進程可以被注冊為接收隊列的通知
    4.當有一個消息到達一個空隊列,而且已經有一個進程被注冊為接收該隊列的通知時,只有在沒有任何線程阻塞在該隊列的mq_receive調用的前提下,通知才會發送。即在mq_receive調用中的阻塞比任何通知的注冊都優先
    5.當該通知已經發送給它的注冊進程時,其注冊即被撤銷。該進程必須再次調用mq_notify以重新注冊
    6.當調用mq_notify但是隊列不為空時,通知不會發送;當隊列變為空,并且有一個消息入隊時,才發送通知

    union sigval {                /* Data passed with notification */
    int     sival_int;        /* Integer value */
    void   *sival_ptr;        /* Pointer value */
    };

    struct sigevent {
    int    sigev_notify;      /* Notification method */
    int    sigev_signo;       /* Notification signal */
    union sigval sigev_value; /* Data passed with notification */
    void (*sigev_notify_function) (union sigval);
    /* Function for thread notification */
    void  *sigev_notify_attributes;
    /* Thread function attributes */
    };
    sigev_notify:SIGEV_NONE,SIGEV_SIGNAL,SIGEV_THREAD

    // 使用非阻塞mq_receive的信號通知
    volatile sig_atomic_t mqflag;
    static void sig_usr1(int);
    int main(int argc, char *argv[])
    {
    mqd_t mqd;
    void *buf;
    ssize_t n;
    sigset_t zeromask, newmask, oldmask;
    struct mq_attr attr;
    struct sigevent sigev;

    assert(argc == 2);
    mqd = mq_open(argv[1], O_RDONLY|O_NONBLOCK);
    mq_getattr(mqd, &attr);
    buf = malloc(attr.mq_msgsize);
    sigemptyset(&zeromask);
    sigemptyset(&newmask);
    sigemptyset(&oldmask);
    sigaddset(&newmask, SIGUSR1);
    signal(SIGUSR1, sig_usr1);
    sigev.sigev_notify = SIGEV_SIGNAL;
    sigev.sigev_signo = SIGUSR1;
    mq_notify(mqd, &sigev);

    for ( ; ; ) {
    sigprocmask(SIG_BLOCK, &newmask, &oldmask);
    while (mqflag == 0)
    sigsuspend(&zeromask);
    mqflag = 0;
    mq_notify(mqd, &sigev);
    while ((n = mq_receive(mqd, buf, attr.mq_msgsize, NULL)) >= 0) {
    printf("read %ld bytes\n", (long)n);
    }
    if (errno != EAGAIN)
    die("mq_receive");
    sigprocmask(SIG_UNBLOCK, &newmask, NULL);
    }

    return 0;
    }
    static void sig_usr1(int signo)
    {
    mqflag = 1;
    return;
    }

    // 使用sigwait代替信號處理程序的信號通知
    #include <signal.h>
    int sigwait(const sigset_t *set, int *sig);
    成功返回0,并設置sig為收到的信號;失敗返回錯誤碼

    int main(...)
    {
    ...
    sigemptyset(&newmask);
    sigaddset(&newmask, SIGUSR1);
    sigprocmask(SIGBLOCK, &newmask, NULL);

    sigev.sigev_notify = SIGEV_SIGNAL;
    sigev.sigev_signo = SIGUSR1;
    mq_notify(mqd, &sigev);
    for ( ; ; ) {
    sigwait(&newmask, &signo);
    if (signo == SIGUSR1) {
    mq_notify(mqd, &sigev);
    while ((n = mq_receive(mqd, buf, len, NULL)) >=0) {
    printf("read %ld bytes\n", n);

    if (errno != EAGAIN)
    die("mq_receive");
    }
    }
    ...
    }

    // 使用select的POSIX消息隊列
    int pfds[2];
    static void sig_usr1(int);
    int main(int argc, char *arg[])
    {
    int fds;
    char c;
    fd_set rfds;
    mqd_t mqd;
    void *buf;
    ssize_t n;
    size_t len;
    struct mq_attr attr;
    struct sigevent sigev;

    asset(argc == 2);
    mqd = mq_open(argv[1], O_RDONLY|O_NONBLOCK);
    mq_getattr(mqd, &attr);
    len = attr.mq_msgsize;
    buf = malloc(len);
    pipe(pfds);
    // 設置信號處理程序,建立通知
    signal(SIGUSR1, sig_usr1);
    sigev.sigev_notify = SIGEV_SIGNAL;
    sigev.sigev_signo = SIGUSR1;
    mq_notify(mqd, &sigev);
    FD_ZERO(&rfds);
    for ( ; ; ) {
    FD_SET(pfds[0], &rfds);
    nfds = select(pfds[0]+1, &rfds, NULL, NULL, NULL);
    if (FD_ISSET(pfds[0], &rfds)) { // 管道可讀
    read(pfds[0], &c, 1);
    mq_notify(mqd, &sigev);
    while ((n = mq_receive(mqd, buf, len, NULL)) >= 0) {
    printf("read %ld bytes\n", (long)n);
    }
    if (errno != EAGAIN)
    die("mq_receive");
    }
    }
    return 0;
    }

    static void sig_usr1(int signo)
    {
    write(pfds[1], "", 1); // 異步信號處理安全的函數
    return;
    }

    // 收到通知后,啟動一個線程,接收消息,然后結束進程
    #include <pthread.h>
    #include <mqueue.h>
    #include <assert.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #define die(msg) { perror(msg); exit(EXIT_FAILURE); }
    static void tfunc(union sigval sv)   /* Thread start function */
    {
    struct mq_attr attr;
    ssize_t nr;
    void *buf;
    mqd_t mqdes = *((mqd_t *) sv.sival_ptr);
    /* Determine max. msg size; allocate buffer to receive msg */
    if (mq_getattr(mqdes, &attr) == -1) die("mq_getattr");
    buf = malloc(attr.mq_msgsize);
    if (buf == NULL) die("malloc");
    nr = mq_receive(mqdes, buf, attr.mq_msgsize, NULL);
    if (nr == -1) die("mq_receive");
    printf("Read %ld bytes from MQ0\n", (long) nr);
    free(buf);
    exit(EXIT_SUCCESS);         /* Terminate the process */
    }

    int main(int argc, char *argv[])
    {
    mqd_t mqdes;
    struct sigevent not;
    assert(argc == 2);
    mqdes = mq_open(argv[1], O_RDONLY);
    if (mqdes == (mqd_t) -1) die("mq_open");
    not.sigev_notify = SIGEV_THREAD;
    not.sigev_notify_function = tfunc;
    not.sigev_notify_attributes = NULL;
    not.sigev_value.sival_ptr = &mqdes;   /* Arg. to thread func. */
    if (mq_notify(mqdes, &not) == -1) die("mq_notify");
    pause();    /* Process will be terminated by thread function */
    return 0;
    }          

    // 啟動一個新線程
    mqd_t mqd;
    struct mq_attr attr;
    struct sigevent sigev;
    static void notify_thread(union sigval);
    int main(int argc, char *argv[])
    {
    assert(argc == 2);
    mqd = mq_open(argv[1], O_RDONLY|O_NONBLOCK);
    mq_getattr(mqd, &attr);
    sigev.sigev_notify = SIGEV_THREAD;
    sigev.sigev_value.sival_ptr = NULL;
    sigev.sigev_notify_function = notify_thread;
    sigev.sigev_notify_attributes = NULL;
    mq_notify(mqd, &sigev);
    for ( ; ; )
    pause();
    return 0;
    }
    static void notify_thread(union sigval arg)
    {
    ssize_t n;
    size_t len;
    void *buf;
    len = attr.mq_msgsize;
    printf("notify_thread started\n");
    buf = malloc(len);
    mq_notify(mqd, &sigev);
    while ((n = mq_receive(mqd, buf, len, NULL)) >= 0) {
    printf("read %ld bytes\n", (long)n);
    }
    if (errno != EAGAIN)
    die("mq_receive");
    free(buf);
    pthread_exit(NULL);
    }

    POSIX實時信號
    unix信號分為兩大組:
    實時信號:SIGRTMIN--SIGRTMAX
    其他信號:SIGINT, SIGQUIT, SIGKILL, ...

    信號的實時行為取決于SA_SIGINFO
    實時行為包含以下特征:
    1.信號是排隊的,即如果一個信號產生了3次,它就遞交3次。以FIFO的順序排隊
    2.當有多種SIGRTMIN到SIGRTMAX范圍內的解阻塞信號排隊時,值較小的信號先于值較大的信號遞交(注意:linux與此相反)
    3.當某個非實時信號遞交時,傳遞給它的信號處理的唯一參數是該信號的值,實時信號比其他信號傳遞更多的信息
    4.有些新函數使用實時信號工作,如sigqueue用來代替kill

    // 查看實時信號的遞交順序
    static void sig_rt(int, siginfo_t *, void *);
    int main(void)
    {
    int i, j;
    pid_t pid;
    sigset_t newset;
    union sigval val;
    printf("SIGRTMIN=%d, SIGRTMAX=%d\n", (int)SIGRTMIN, (int)SIGRTMAX);
    pid = fork();
    if (pid < 0) die("fork");
    else if (pid == 0) {
    /* 阻塞3個實時信號 */
    sigemptyset(&newset);
    sigaddset(&newset, SIGRTMIN);
    sigaddset(&newset, SIGRTMIN+1);
    sigaddset(&newset, SIGRTMIN+2);
    sigprocmask(SIG_BLOCK, &newset, NULL);
    signal_rt(SIGRTMIN, sig_rt);
    signal_rt(SIGRTMIN+1, sig_rt);
    signal_rt(SIGRTMIN+2, sig_rt);
    sleep(6);
    sigprocmask(SIG_UNBLOCK, &newset, NULL);
    sleep(3);
    exit(0);
    }
    else {
    sleep(3);
    for (i=SIGRTMIN; i<=SIGRTMIN+2; i++) {
    for (j=0; j<=2; j++) {
    val.sival_int = j;
    sigqueue(pid, i, val);
    printf("send signal signo=%d, val=%d\n", i, j);
    }
    }
    exit(0);
    }
    }

    static void sig_rt(int signo, siginfo_t *info, void *context)
    {
    printf("receive signal signo=%d, code=%d, ival=%d\n",
    signo, info->si_code, info->si_value.sival_int);
    }
    typedef void sigfunc_rt(int, siginfo_t *, void *);
    sigfunc_rt *signal_rt(int signo, sigfunc_rt *func)
    {
    struct sigaction act, oact;
    act.sa_sigaction = func;
    sigemptyset(&act.sa_mask);
    act.sa_flags = SA_SIGINFO; /* 實時信號必須指定 */
    if (signo == SIGALRM) {
    #ifdef    SA_INTERRUPT
    act.sa_flags |= SA_INTERRUPT;
    #endif        
    }
    else {
    #ifdef    SA_RESTART
    act.sa_flags |= SA_RESTART;
    #endif
    }
    if (sigaction(signo, &act, &oact) < 0)
    return (sigfunc_rt *)SIG_ERR;
    else
    return oact.sa_sigaction;
    }
    輸出如下:
    [root@jiangkun unp]# ./rtsig 
    SIGRTMIN=34, SIGRTMAX=64
    send signal signo=34, val=0
    send signal signo=34, val=1
    send signal signo=34, val=2
    send signal signo=35, val=0
    send signal signo=35, val=1
    send signal signo=35, val=2
    send signal signo=36, val=0
    send signal signo=36, val=1
    send signal signo=36, val=2
    receive signal signo=36, code=-1, ival=0
    receive signal signo=36, code=-1, ival=1
    receive signal signo=36, code=-1, ival=2
    receive signal signo=35, code=-1, ival=0
    receive signal signo=35, code=-1, ival=1
    receive signal signo=35, code=-1, ival=2
    receive signal signo=34, code=-1, ival=0
    receive signal signo=34, code=-1, ival=1
    receive signal signo=34, code=-1, ival=2

    struct sigaction {
    void (*sa_handler)(int);
    void (*sa_sigaction)(int, siginfo_t *, void *);
    sigset_t sa_mask;
    int sa_flags;
    void (*sa_restorer)(void); /* 被遺棄了! */
    };

    實時信號之所以是可靠的,因為在進程阻塞該信號的時間內,發給該進程的所有實時信號會排隊,而非實時信號則會合并為一個信號。早期的kill函數只能向特 定的進程發送一個特定的信號,并且早期的信號處理函數也不能接受附加數據。siqueue和sigaction解決了這個問題。
    下面這個例子中,進程先屏蔽SIGINT和SIGRTMIN兩個信號,其中SIGINT是非實時信號,而SIGRTMIN為實時信號,接著進程睡眠,睡眠完成之后再接觸對這兩個信號的屏蔽,此時可以比較對兩種信號的處理方式是否一樣。
    #include <stdio.h>
    #include <string.h>
    #include <signal.h>
    #include <unistd.h>
    void sig_handler(int, siginfo_t*, void*);
    int main(int argc,char *argv[])
    {
    struct sigaction act;
    sigset_t newmask, oldmask;
    int rc;    
    sigemptyset(&newmask);
    /* 往信號集中添加一個非實時信號 */
    sigaddset(&newmask, SIGINT);
    /* 往信號集中添加一個實時信號 */
    sigaddset(&newmask, SIGRTMIN);
    /* 屏蔽實時信號SIGRTMIN */
    sigprocmask(SIG_BLOCK, &newmask, &oldmask);
    act.sa_sigaction = sig_handler;
    act.sa_flags = SA_SIGINFO;
    if(sigaction(SIGINT, &act, NULL) < 0) {
    printf("install signal error\n");
    }
    if(sigaction(SIGRTMIN, &act, NULL) < 0) {
    printf("install signal error\n");
    }
    printf("pid = %d\n", getpid());
    /* 進程睡眠,在此時間內的發給該進程的所有實時信號 將排隊,不會有信號丟失 */
    sleep(20);    
    /* 解除對SIGRTMIN信號的屏蔽,信號處理函數將會被調用 */
    sigprocmask(SIG_SETMASK, &oldmask, NULL);
    return 0;
    }
    void sig_handler(int signo, siginfo_t *info, void *context)
    {
    if(signo == SIGINT)
    printf("Got a common signal\n");
    else
    printf("Got a real time signal\n");
    }

    將程序編譯好之后,再開一個終端用于發送實時信號。
    # ./sigqueue_receive 
    pid = 8871
    進程開始睡眠……
    在新的終端輸入:
    ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871
    ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871
    ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871
    ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871
    連續發送四個SIGRTMIN,接著回到之前的終端,連續四次按下"ctrl+c"。
    ^C^C^C^C
    最后進程終于醒來,整個輸出如下:
    pid = 8871
    ^C^C^C^CGot a real time signal
    Got a real time signal
    Got a real time signal
    Got a real time signal
    Got a common signal
    果然接受到四個實時信號,并且四次調用了信號處理函數,而對于SIGINT,雖然也按下了四次"ctrl+c",但是進程對其只做一次處理。這個例子中是先發實時信號后發非實時信號,所以信號處理函數先處理實
    時信號,如果只是按照順序注冊信號的話,這很好理解,但是換一下,先按下了四次"ctrl+c"然后使用kill發四次實時信號,結果發現輸出的結果仍然 一樣,這說明實時信號的優先級比非實時信號要高,內核每個進程的信號組成一個雙向鏈表,實時信號插入的時候就不是隨便插在尾部了。
    信號的優先級:信號實質上是軟中斷,中斷有優先級,信號也有優先級。如果一個進程有多個未決信號,則對于同一個未決的實時信號,內核將按照發送的順序來遞 交信號。如果存在多個未決的實時信號,則值(或者說編號)越大的越先被遞送。如果既存在不可靠信號,又存在可靠信號(實時信號),雖然POSIX對這一情 況沒有明確規定,但Linux系統和大多數遵循POSIX標準的操作系統一樣,將優先遞交可靠信號。一個進程如果處理 SIGQUIT(3),SIGINT(2),SIGHUP(1)(通過"kill -l" 可以查看信號的編號),那么先后給該進程發送SIGINT,SIGHUP,SIGQUIT,處理的順序會是SIGQUIT,SIGINT,SIGHUP, 不論改變這個三個信號的發送順序,處理的順序都是一樣的。

    RFID管理系統集成商 RFID中間件 條碼系統中間層 物聯網軟件集成
    最近免费观看高清韩国日本大全