#include <mqueue.h> mqd_t mq_notify(mqd_t mqdes, const struct sigevent *notification);
notification 引き数は sigevent 構造体へのポインタである。 sigevent 構造体は以下のような感じで定義されている:
union sigval { /* Data passed with notification */ int sival_int; /* Integer value */ void *sival_ptr; /* Pointer value */ }; struct sigevent { int sigev_notify; /* Notification method */ int sigev_signo; /* Notification signal */ union sigval sigev_value; /* Data passed with notification */ void (*sigev_notify_function) (union sigval); /* Function for thread notification */ void *sigev_notify_attributes; /* Thread function attributes */ };
notification が NULL でないポインタであれば、 mq_notify() はメッセージ通知を受け取るように呼び出し元のプロセスを登録する。 notification が指す sigevent の sigev_notify フィールドは、どのような通知を行うのかを指定する。 このフィールドは以下の値のいずれかを持つ。
一つのメッセージキューから通知を受信するように登録できるプロセスは 一つだけである。
notification が NULL で、かつ呼び出し元のプロセスがこのメッセージキューからの 通知を受信するに現在登録している場合、登録を削除する。 これ以降、別のプロセスがこのメッセージキューから通知を受信するように 登録できるようになる。
メッセージ通知は、それまで空のキューに新しいメッセージが到着した 場合にのみ行われる。 mq_notify() が呼び出された時にそのキューが空でない場合、 そのキューが空になり、その後新しいメッセージが到着した時に 初めて通知が行われることになる。
別のプロセスやスレッドが mq_receive(3) を使って、空のキューからメッセージの読み出しを待っている場合、 メッセージ通知の登録は全て無視される。 メッセージは mq_receive(3) を呼び出しているプロセスやスレッドに配送され、 メッセージ通知の登録は効力を持ったままとなる。
通知は一度だけ行われる。通知が送られた後は、通知要求の登録は削除され、 別のプロセスがメッセージ通知を受信するように登録できるようになる。 通知を受けたプロセスが次の通知も受信したい場合は、 mq_notify() を使ってその後の通知も受けるように要求することができる。 mq_notify() を再度呼び出すのは、読み出していないメッセージを全部読み出して キューが空になる前にすべきである (キューからのメッセージ読み出しをキューが空になった時に 停止 (block) せずに行うには、キューを非停止モード (non-blocking mode) に設定しておくとよい)。
#include <pthread.h> #include <mqueue.h> #include <assert.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #define die(msg) { perror(msg); exit(EXIT_FAILURE); } static void /* スレッド開始関数 */ tfunc(union sigval sv) { struct mq_attr attr; ssize_t nr; void *buf; mqd_t mqdes = *((mqd_t *) sv.sival_ptr); /* 最大メッセージサイズを決定し、 メッセージ受信用のバッファを確保する */ if (mq_getattr(mqdes, &attr) == -1) die("mq_getattr"); buf = malloc(attr.mq_msgsize); if (buf == NULL) die("malloc"); nr = mq_receive(mqdes, buf, attr.mq_msgsize, NULL); if (nr == -1) die("mq_receive"); printf("Read %ld bytes from MQ\n", (long) nr); free(buf); exit(EXIT_SUCCESS); /* プロセスを終了する */ } int main(int argc, char *argv[]) { mqd_t mqdes; struct sigevent not; assert(argc == 2); mqdes = mq_open(argv[1], O_RDONLY); if (mqdes == (mqd_t) -1) die("mq_open"); not.sigev_notify = SIGEV_THREAD; not.sigev_notify_function = tfunc; not.sigev_notify_attributes = NULL; not.sigev_value.sival_ptr = &mqdes; /* スレッド関数に渡す引き数 */ if (mq_notify(mqdes, ¬) == -1) die("mq_notify"); pause(); /* プロセスはスレッド関数により終了される */ }