POSIXメッセージキューの使い方
以前,当サイトで公開していたエントリーのリライトになります.
Unix系のシステムではSystemVとPOSIXのメッセージキューを使用することができる場合が多く,お手軽にIPCを使いたい場合このあたりが選択肢になるのではないかと思います.(とはいえ,いろいろ考慮してちゃんと実装しようとすると結構大変なのでプロダクトへの採用は慎重に...)
カーネルの機能として実装されているためWSL(2じゃない方)では使えなかったりするのでそのあたりは注意が必要です. なお,ソースコードはgithubでも公開しています.
基本的な使い方
POSIXメッセージキューを使うにはヘッダ mqueue.h をインクルードし, librt.so をリンクします.
くわしくは
$man mq_overview
mq_open
メッセージキューをオープンします.
#include <mqueue.h>
mqd_t mq_open(const char *name, int oflag);
#include <mqueue.h>
mqd_t mq_open(const char *name, int oflag, mode_t mode,
struct mq_attr *attr);
作成/オープンに成功すると戻り値にキューディスクリプタの番号が入ります. 失敗した場合は-1が返り,errno にエラー番号が入ります.
name
キューの名前を指定します.キューの名前は / ではじまり / 以外の文字を最低1文字含む文字列でなくてはなりません.
oflag
fcntl.h で定義されている制御フラグを指定します. * O_RDONLY : 受信専用 * O_WRONLY : 送信専用 * O_RDWR : 送受信 上記フラグとORで以下のフラグも指定可能です. * O_NONBLOCK : ノンブロッキングモード * O_CREAT : キューが存在しない場合は作成 * O_EXCL : すでに同名のキューが存在して O_CREAT が指定されている場合はエラーにする
mode
oflag に O_CREAT を指定した場合,追加でこのパラメータが必要になります. modeパラメータは sys/stat.h の定義値を使用できる他,0600 のように数字での指定も可能です. くわしくは
$man 2 open
attr
mq_attr 構造体で作成するキューの属性を指定できます.NULLの場合はデフォルト設定になります. デフォルト設定は /proc/sys/fs/mqueue/* に書かれています. くわしくは
$man mq_getattr
mq_send, mq_timedsend
#include <mqueue.h>
int mq_send(mqd_t mqdes, const char *msg_ptr,
size_t msg_len, unsigned int msg_prio);
#include <time.h>
#include <mqueue.h>
int mq_timedsend(mqd_t mqdes, const char *msg_ptr,
size_t msg_len, unsigned int msg_prio,
const struct timespec *abs_timeout);
mqdes, *msg_ptr
ディスクリプタ mqdes のキューに msg_ptr のメッセージを送ります. くわしくは
$man mq_send
msg_len
msg_ptr が指すメッセージのサイズを指定します.キューの属性 mq_msgsize 以下のサイズでなければなりません.
msg_prio
メッセージの優先度を指定できます.小さいほど優先度が高く,同じ優先度のメッセージは到着順に処理されます.
abs_timeout
タイムアウトの時間を指定できます.タイムアウトした場合はエラーとして関数が返ります.キューがBLOCKINGモードである必要があります. タイムアウトはtimespec構造体で指定します.
struct timespec {
time_t tv_sec; /* 秒 */
long tv_nsec; /* ナノ秒 */
};
時刻設定が飛ぶと想定外のタイムアウトが発生したり,タイムアウトが発生しなかったりする場合があるため,poll などで監視して monotonic な時間でタイムアウトする実装にした方が良いです.mqueueに限った話ではありませんが,組み込み機器では起動してから外部のソースから時刻情報を受けたり,ユーザが手動で設定したりと,システム時刻が変わるユースケースがよくあるため,タイムアウトをrealtimeクロックに頼るのは危険です.こういったシステムコールにしても,内部的に使用しているのがmonotonicかどうか気を配る必要がありますね.
ちなみに,一昔前までは組み込みシステムによく使われていた印象のあるQNXには,この問題に対応した mq_timedsend_monotonic という便利なシステム関数があります.
mq_receive, mq_timedreceive
メッセージキューからメッセージを受け取ります.
#include <mqueue.h>
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr,
size_t msg_len, unsigned int *msg_prio);
#include <time.h>
#include <mqueue.h>
ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr,
size_t msg_len, unsigned int *msg_prio,
const struct timespec *abs_timeout);
ディスクリプタ mqdes のキューからメッセージを引き抜いて msg_ptr に格納します. 引き抜いたメッセージはキューから削除されます.
各引数は上記 mq_send と同じような感じになります.
実装例
タイムアウトなし版
例として sender は hello を10回送りつけてからcloseを送る.receiver は 送られてきたメッセージを出力する.close が送られてきたらunlinkしてプログラムを終了する.というサンプルプラグラムを実装してみます.
sender.c
#include <mqueue.h>
#include <stdio.h>
#include <unistd.h>
int main()
{
int i;
char* buff;
mqd_t send_que;
send_que = mq_open("/mq_sample", O_WRONLY|O_CREAT, 0644, NULL);
if(-1 == send_que)
{
perror("mq_open error");
}
else
{
for(i=0; i<10; i++)
{
buff = "hello";
printf("send msg: %s \n", buff);
if(-1 == mq_send(send_que, buff, sizeof(buff), 0))
{
perror("error");
}
sleep(1);
}
buff = "close";
printf("send msg: %s \n", buff);
if(-1 == mq_send(send_que, buff, sizeof(buff), 0))
{
perror("error");
}
}
return 0;
}
receiver.c
#include <mqueue.h>
#include <stdio.h>
#include <string.h>
int main()
{
int i;
char buff[10];
struct mq_attr attr;
mqd_t recv_que;
recv_que = mq_open("/mq_sample", O_RDONLY|O_CREAT, 0644, NULL);
mq_getattr(recv_que, &attr);
if(-1 == recv_que)
{
perror("mq_open error");
}
else
{
while(1)
{
if(-1 == mq_receive(recv_que, buff, attr.mq_msgsize, NULL))
{
perror("mq_receive error");
break;
}
printf("received msg: %s \n", buff);
if(0 == strcmp(buff, "close"))
{
break;
}
}
}
printf("close and unlink que\n");
mq_close(recv_que);
mq_unlink("/mq_sample");
return 0;
}
コンパイル方法
$gcc sender.c -lrt -o sender
$gcc receiver.c -lrt -o receiver
実行方法
$./receiver &
$./sender
※それぞれ別の端末から実行でもOK
タイムアウトつき
mq_timedsend / mq_timedreceive を使用する場合,clock_gettime でシステムの現在時刻を取得します. 取得した現在時刻に任意の時間を足すことでタイムアウトを設定します.
mq_timedsendはキューがフルでブロッキング中にタイムアウト時間を迎えると関数を抜けてきます. たとえばreceiver側のプロセスが死んでいるとか,実際にありそうな故障モードを想定したとき,ブロックしたままだと都合が悪いため適当にタイムアウトさせたいといったケースが考えられます.
mq_timedreceive は,キューに新しいメッセージが来ず,ブロッキング中にタイムアウト時間を迎えると関数を抜けてきます.たとえばheartbeat監視のような機構があって,メッセージ待ち状態でも一定時間に一度は関数を抜けてきて欲しいといったケースが考えられます.
上述の通り CLOCK_REALTIME を使って時間監視をするため * システム時間が未来に飛ぶと即座にタイムアウトしてしまう * システム時間が過去に飛ぶと長時間タイムアウトしない といった問題が発生するため,組込システムにおいては基本的に使用すべきではないと思います.
sender.c
#include <mqueue.h>
#include <stdio.h>
#include <time.h>
#include <unistd.h>
int main()
{
int i;
char* buff;
mqd_t send_que;
send_que = mq_open("/mq_timed_sample", O_CREAT|O_WRONLY, 0644, NULL);
if(-1 == send_que)
{
perror("mq_open error");
}
else
{
struct timespec timeout;
for(i=0; i<11; i++)
{
if(-1 == clock_gettime(CLOCK_REALTIME, &timeout))
{
perror("clock_gettime error");
}
timeout.tv_sec += 10;
if( i < 10) { buff = "hello"; }
else { buff = "close"; }
printf("send msg: %s \n", buff);
if(-1 == mq_timedsend(send_que, buff, sizeof(buff), 0, &timeout))
{
perror("mq_timedsend error");
}
sleep(1);
}
}
return 0;
}
receiver.c
#include <mqueue.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <errno.h>
int main()
{
int i;
char buff[10];
struct mq_attr attr;
mqd_t recv_que;
recv_que = mq_open("/mq_timed_sample", O_RDONLY|O_CREAT, 0644, NULL);
mq_getattr(recv_que, &attr);
struct timespec timeout;
if(-1 == recv_que)
{
perror("mq_open error");
}
else
{
struct timespec timeout;
while(1)
{
if(-1 == clock_gettime(CLOCK_REALTIME, &timeout))
{
perror("error");
}
timeout.tv_sec += 1;
if(-1 == mq_timedreceive(recv_que, buff, attr.mq_msgsize, NULL, &timeout))
{
if(errno == ETIMEDOUT)
{
printf("Timeout!\n");
continue;
}
else
{
perror("mq_receive error");
break;
}
}
printf("received msg: %s \n", buff);
if(0 == strcmp(buff, "close"))
{
break;
}
}
}
printf("close and unlink que\n");
mq_close(recv_que);
mq_unlink("/mq_timed_sample");
return 0;
}
poll によるタイムアウトつき
mqのファイルディスクリプタをepollで監視できます.epoll_waitのtimeoutはMONOTONICを使用しているのでCLOCK_REALTIMEを使用したmq_timed*()で発生する問題を回避できます.実際にプロダクトなどで使用する場合はこちらを使うのが賢明な判断でしょう.
まともにエラー処理をやろうと思うと考えなければならないことがけっこうあるので大変です. epollについては機会があればもうちょっと深耕したいと思っています.
TODO:サンプルコードもうちょっとマシにしたい
sender.c
#include <mqueue.h>
#include <stdio.h>
#include <time.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <unistd.h>
int main()
{
int i;
char* buff;
mqd_t send_que;
send_que = mq_open("/mq_epoll_sample", O_CREAT|O_WRONLY|O_NONBLOCK, 0644, NULL);
if(-1 == send_que)
{
perror("mq_open error");
}
else
{
int epfd = epoll_create(1);
if(-1 == epfd)
{
perror("epoll_create error");
return 1;
}
struct epoll_event send_event, event;
send_event.data.fd = send_que;
send_event.events = EPOLLOUT;
if(-1 == epoll_ctl(epfd, EPOLL_CTL_ADD, send_que, &send_event))
{
perror("epoll_ctl");
close(epfd);
return 1;
}
for(i=0; i<11; i++)
{
if( i < 10) { buff = "hello"; }
else { buff = "close"; }
printf("send msg: %s \n", buff);
while(1)
{
int evt_count = epoll_wait(epfd, &event, 1, 1000);
if(evt_count < 0)
{
perror("epoll_wait");
return -1;
}
else if(evt_count == 0)
{
printf("Timeout!\n");
continue;
}
else
{
if(-1 == mq_send(send_que, buff, sizeof(buff), 0))
{
perror("error");
continue;
}
printf("Sent Message #%d \n", i);
break;
}
}
sleep(1);
}
}
return 0;
}
receiver.c
#include <mqueue.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <errno.h>
#include <sys/epoll.h>
#include <unistd.h>
int main()
{
int i;
char buff[10];
struct mq_attr attr;
mqd_t recv_que;
recv_que = mq_open("/mq_epoll_sample", O_RDONLY|O_CREAT|O_NONBLOCK, 0644, NULL);
mq_getattr(recv_que, &attr);
if(-1 == recv_que)
{
perror("mq_open error");
}
else
{
// 1: Create epoll fd
// epoll_create() の引数は後方互換のためのものであり最近のカーネルだと意味は特にない
// 詳細は man epoll
int epfd = epoll_create(1);
if(-1 == epfd)
{
perror("epoll_create1 error");
return 1;
}
// 2: Add fd for epoll to watch
// fd recv_que の POLLIN を監視する
struct epoll_event recv_event, event;
recv_event.data.fd = recv_que;
recv_event.events = EPOLLIN;
if(-1 == epoll_ctl(epfd, EPOLL_CTL_ADD, recv_que, &recv_event))
{
perror("epoll_ctl");
close(epfd);
return 1;
}
while(1)
{
printf("Start epoll\n");
int evt_count = epoll_wait(epfd, &event, 1, 3000);
// queにメッセージが入っていれば抜けてくる
if(evt_count < 0)
{
perror("epoll_wait error");
break;
}
else if(evt_count == 0)
{
printf("Timeout!\n");
continue;
}
// 入っているメッセージ全部抜く
while(-1 != mq_receive(recv_que, buff, attr.mq_msgsize, NULL))
{
printf("received msg: %s \n", buff);
if(0 == strcmp(buff, "close"))
{
goto end;
}
}
}
end:
close(epfd);
}
printf("close and unlink que\n");
mq_close(recv_que);
mq_unlink("/mq_epoll_sample");
return 0;
}