컴퓨터/운영체제(주로 리눅스)

[리눅스] POSIX 메시지 큐 사용 방법 - 예제 코드

REAKWON 2022. 4. 4. 21:23

메시지큐, 공유메모리 등 더 많은 정보와 예제코드를 담은 리눅스 교재를 배포했습니다. 아래의 페이지에서 리눅스 교재를 받아가세요.

https://reakwon.tistory.com/233

 

리눅스 프로그래밍 note 배포

티스토리에 리눅스에 관한 내용을 두서없이 여지껏 포스팅했었데요. 저도 제 포스팅을 찾기가 어렵기도 하고 티스토리에서 코드삽입을 하게 되면 이게 일자로 쭉 쓰여져있는 x같은 현상이 생겨

reakwon.tistory.com

 

Message Queue

프로세스 간 통신 방식인 IPC기법 중 하나인 메시지 큐는 두 가지 사용 방법이 있습니다. msgget, msgsend, msgrecv와 같은 함수를 사용하는 방식인 System V 방식의 메시지큐, 그리고 지금 알아볼 mq_open, mq_send, mq_recv와 같은 함수를 사용하는 방식인 POSIX 방식이 있습니다. 

System V의 메시지 큐를 사용하는 방법과 예제를 보시려면 아래의 포스팅을 방문하여 확인해보세요.

https://reakwon.tistory.com/97

 

[리눅스] 메시지 큐(message queue) 개념과 예제(msgget, msgsnd, msgrcv, msgctl)

메시지 큐 IPC기법 중 하나인 메시지큐는 Data Structure 중 큐를 사용합니다. 기본적으로는 먼저온 메시지가 먼저 꺼내어집니다. 메시지큐의 msgtype에 따라 특정 메시지 중 가장 먼저들어온 메시지도

reakwon.tistory.com

 

System V는 옛날 유닉스 방식으로 보시면 되구요. 이후에 나온것이 POSIX입니다. POSIX는 Portable Operating System Interface라고 해서 이식 가능 운영체제 인터페이스라고 합니다. 리눅스가 POSIX를 준수하는 운영체제이구요. 

우선 다음의 헤더파일들이 필요합니다. 

#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>

 

컴파일할때는 -lrt로 링크를 걸어줘야하구요. 아래에서 코드를 실행하고 컴파일할때 다 설명할겁니다.

1) mq_open

mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);

 

메시지 큐를 여는 함수로 2개가 존재하네요. mode와 메시지 큐의 속성을 정의하는 attr도 같이 넘겨줄수 있는 함수가 따로 존재합니다. 

name : 메시지 큐 이름을 지정합니다.

oflag : 메시지큐에 옵션을 줄 수 있습니다. 아래와 같은 flag들이 존재합니다. 아래의 flag를 쓰려고 fcntl.h 파일을 include해야하는 겁니다.

flag 설명
O_RDONLY 읽기(메시지 받기:recv) 전용으로 메시지 큐를 엽니다. 
O_WRONLY 쓰기(메시지 전송:send) 전용으로 메시지 큐를 엽니다.
O_RDWR 읽기, 쓰기 전용으로 메시지 큐를 엽니다.
O_CLOEXEC close-on-exec이라고 하여, exec시 메시지큐를 닫습니다.
O_CREAT 메시지 큐가 존재하지 않으면 메시지큐를 생성합니다. 이 플래그를 사용하려면 두가지 인자가 추가로 더 필요한데 그게 바로 mode와 attr입니다.
O_EXCL O_CREAT과 같이 사용하여 이미 파일이 존재하면 EEXIST에러를 발생시킵니다.
O_NONBLOCK 비블록 형식으로 메시지큐를 엽니다. mq_send나 mq_receive시에 메시지가 없으면 지속적으로 기다리게 되는데, 이 플래그를 사용하면 기다리지 않습니다.

 

mode : O_CREAT과 같이 사용합니다. 메시지큐를 만들때 권한을 설정합니다.

attr : mq_attr의 포인터로 다음과 같이 정의되어있습니다. 메시지큐의 속성을 지정하는데, 최대 메시지수, 최대 메시지 사이즈(바이트) 등을 정할 수 있습니다.

 struct mq_attr {
       long mq_flags;       /* Flags (ignored for mq_open()) */
       long mq_maxmsg;      /* Max. # of messages on queue */
       long mq_msgsize;     /* Max. message size (bytes) */
       long mq_curmsgs;     /* # of messages currently in queue
                                       (ignored for mq_open()) */
};

반환 : 만약 메시지큐가 올바르게 생성되었다면 메시지큐를 제어할 수 있는 mqd_t가 반환됩니다. 우리는 이것을 통해서 메시지를 주고 받을 수 있습니다.

 

2) mq_send, mq_timedsend

int mq_send(mqd_t mqdes, const char *msg_ptr,nsize_t msg_len, unsigned int msg_prio);

#include <time.h>
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio,
                     const struct timespec *abs_timeout);

 

메시지를 보내는 함수는 두가지가 존재합니다. 

mqdes : mq_open시에 받은 mqd_t입니다. 

msg_ptr : char형 메시지 포인터입니다. 메시지 내용을 의미합니다.

msg_len : 이름에서 알 수 있듯이 메시지의 크기를 의미합니다.

msg_prio : 메시지의 우선순위를 지정합니다.

abs_timeout : 지정된 시간동안 메시지 전송을 보류합니다. 큐가 꽉 찼을 경우가 있을 경우에 말이죠. NON_BLOCK 플래그가 없어야합니다. timespec 구조체는 아래와 같습니다. 이 구조체를 사용하기 위해서 time.h가 필요합니다.

struct timespec {
       time_t tv_sec;        /* seconds */
       long   tv_nsec;       /* nanoseconds */
};

 

반환 : 성공시 0, 실패시 -1을 반환합니다.

 

3) mq_receive, mq_timedreceive

ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);

#include <time.h>
ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio,
                          const struct timespec *abs_timeout);

 

자, 메시지를 보냈으니까 받아야겠죠. mq_send와 받는다는 것 외에는 모두 똑같습니다. msg_ptr에 메시지가 들어오게 됩니다. 그리고 mq_timedreceive를 통해서 역시 메시지가 쌓일때까지 지정된 시간만큼 기다릴수 있습니다.

반환 : 만약 올바르게 받았다면 실제로 받은 메시지의 사이즈가 반환되고 실패할 경우 -1이 반환됩니다. 

혹시 Message too long 에러가 발생한다면, 이것은 mq_receive 계열 함수에서 발생할 수 있는 에러인데, mq_receive의 man 페이지에서는 아래와 같이 설명하고 있습니다. mq_receive의 msg_len의 값은 attribute에서 설정한 mq_msgsize보다 작으면 안됩니다. (msg_len >= mq_msgsize)

 EMSGSIZE
              msg_len was less than the mq_msgsize attribute of the message queue.

 

4) mq_close

int mq_close(mqd_t mqdes);

 

mqdes : 닫을 mq를 지정합니다.

반환 : 성공시 0, 실패시 -1을 반환합니다. 

 

mq 기본예제

다음의 server.c는 메시지큐를 열고 받은 메시지를 출력하는 아주 간단한 역할을 합니다.

server.c

#include <stdlib.h>
#include <fcntl.h>
#include <stdio.h>
#include <mqueue.h>
#include <sys/stat.h>

int main()
{
    struct mq_attr attr;
    //attr 때문에 에러 발생할 수 있음
    attr.mq_maxmsg = 20;
    attr.mq_msgsize = 128;
    char buf[128] = {0,};

    mqd_t mq;

    mq = mq_open("/reakwon_mq", O_RDWR | O_CREAT,  0666, &attr);
    if (mq == -1)
    {
            perror("message queue open error");
            exit(1);
    }

    if((mq_receive(mq, buf, attr.mq_msgsize,NULL)) == -1){
            perror("mq_receive error");
            exit(-1);
    }
    printf("mq received : %s\n", buf);
    mq_close(mq);
}

 

그리고 client.c는 메시지큐를 열고 메시지를 보내는 아주 간단한 프로그램이죠.

 

client.c

#include <stdlib.h>
#include <fcntl.h>
#include <mqueue.h>
#include <stdio.h>
#include <string.h>
int main(int argc, char **argv)
{
    struct mq_attr attr;
    //attr때문에 에러가 발생할 수 있음
    attr.mq_maxmsg = 20;
    attr.mq_msgsize = 128;
    mqd_t mq;
    char buf[128] = {0,};

    mq = mq_open("/reakwon_mq", O_WRONLY, 0666, &attr);
    if(mq == -1){
        perror("open error");
        exit(0);
    }

    scanf("%s", buf);
    if((mq_send(mq, buf, strlen(buf), 1)) == -1){
            perror("mq_send error");
            exit(-1);
    }
    mq_close(mq);
}

 

server.c와 client.c의 메시지큐 이름은 같아야합니다. 그리고 아래와 같이 컴파일해보도록 하죠. 아까 -lrt로 링크걸어줘야한다고 했죠?

# gcc server.c -o server -lrt
# gcc client.c -o client -lrt

 

그리고 server를 백그라운드로 실행시키고, client를 실행하여 메시지를 보내려고 하는 순간, 리눅스 세계는 그렇게 호락호락하지가 않죠. 아마도 아래의 에러 메시지가 발생할 수가 있습니다.

Invalid argument!

 

리눅스 시스템마다 message queue의 메시지 최대 사이즈라던가 큐의 크기가 정해져있는 한계(limit)이 있습니다. 어떻게 볼 수 있을까요? 아래의 경로에서 파일을 cat으로 확인할 수 있습니다.

/proc/sys/fs/mqueue/

 

여기서 메시지큐의 최대 크기와 메시지의 최대 크기를 볼까요? 

# cat /proc/sys/fs/mqueue/msgsize_max
8192
# cat /proc/sys/fs/mqueue/msg_max
10

 

위처럼 각 메시지의 최대 사이즈는 8192, 큐의 최대 들어갈 수 있는 메시지는 10개 입니다. 그래서 invalid argument 에러가 발생한 건데, 방법은 두 가지 입니다. 1. attr을 아래와 같이 시스템 limit에 맞게 수정하던지, 아니면 2. /proc/sys/fs/mqueue 안의 파일 내용을 강제로 고치던지 말이죠. 

1. attr 수정

attr.mq_maxmsg = 10;
attr.mq_msgsize = 8192;
char buf[8192] = {0,};

 

2. 루트 권한으로 아래 명령어 수행

echo 20 > /proc/sys/fs/mqueue/msg_max

 

이후에 컴파일 후 다시 수행해보시기 바랍니다.

# ./server &
[3] 17126
# ./client
hello?
mq received : hello?

 

서버는 받은 메세지를 정상적으로 출력하고 있네요. 

그런데, 메시지 큐는 어디에 실제로 생성이 될까요? 메시지큐가 실제 생성된 경로는 /dev/mqueue/ 하위에 mq_open시에 지정했던 mq 이름으로 존재하고 있습니다. 알아두는게 좋겠죠?

 

server.c의 코드를 아래와 같이 변경해봅시다. 아래의 코드는 mq_timedreceive를 통해서 일정 10초간 큐에 데이터가 없으면 곧장 종료하게 됩니다. 


#include <stdlib.h>
#include <fcntl.h>
#include <stdio.h>
#include <mqueue.h>
#include <sys/stat.h>
#include <time.h>

int main()
{
    struct mq_attr attr;
    attr.mq_maxmsg = 20;
    attr.mq_msgsize = 128;
    char buf[128] = {0,};

    mqd_t mq;
    struct timespec tm;

    mq = mq_open("/reakwon_mq", O_RDWR | O_CREAT,  0666, &attr);
    if (mq == -1)
    {
            perror("message queue open error");
            exit(1);
    }

    clock_gettime(CLOCK_REALTIME, &tm);
    tm.tv_sec += 10;    //현재 시간 + 10초
    if(mq_timedreceive(mq, buf, attr.mq_msgsize, 0, &tm) == -1){
            perror("mq_receive error");
            exit(-1);
    }
    printf("mq received : %s\n", buf);
    mq_close(mq);
}

 

컴파일하고 서버를 실행시켜 10초간 대기해보세요. 아래와 같이 종료하게 됩니다.

# ./server
mq_receive error: Connection timed out

 

단, 그전에 클라이언트를 실행해서 큐에 데이터가 쌓여있다면 그 큐에 있던 데이터가 출력이 됩니다. 

여기까지 POSIX의 메시지 큐 활용에 대해서 알아보았습니다.

반응형