LINUX.ORG.RU

Очередь сообщений с ожиданием нового сообщения в ядре по типу select()

 ,


1

2

Пишу приложение, в котором обмен сообщениями между несколькими потоками я собираюсь организовать при помощи очередей сообщений. При чем каждый поток должен ожидать сообщения от нескольких потоков одновременно. То есть, алгоритм ожидания событий должен быть полностью идентичен алгоритму ожидания событий в select(). Сами очереди событий буду писать сам, объектами SysV пользоваться не буду, так как мне не нужно взаимодействие между процессами. Проблема в том, что все алгоритмы, реализации которых мне удалось найти предлагают различные варианты с периодическим опросом и ожиданием, типа sleep(). Но в них присутствует задержка на обработку событий и бессмысленное периодическое просыпание потоков для проверки. Есть ли вариант реализации без этих дефектов? Прошу помощи с информацией.

Возможно стоит попробовать EM-ODP. Помещаете сообщения в эвенты, отправляете их в очереди, принимаете в еошках, которые привязаны к очередям. Если несколько типов очередей - atomic, ordered, parallel. При этом исполнение еошек возможно одновременно на нескольких ядрах и они будут работать, только когда диспетчер вызовет.

Patifon_Kakao
()

Через condition variable пробовал? Если не через него, то остаются варианты с select/poll по eventfd и io_uring
Ядру нет особой разницы, разные у тебя процессы или потоки, вся разница там в шаренных памяти и дескрипторах. Так что решение будет скорее всего такое же, как в случае ipc.
Единственное отличие - в твоём случае может использоваться futex, чтобы как раз лишний раз не пробуждать потоки, как я понимаю, он как раз используется для реализации pthread_cond_wait

mittorn ★★★★★
()

Вам нужно, чтобы поток заснул в ожидании событий, не используя cpu до их наступления. Верно? Обеспечить засыпание возможно только посредством ядра. Т.е. select/poll/epoll использовать придётся. Другое дело, как использовать - тут уже простор для фантазии.

Sorcerer ★★★★★
()

Для аналогичной задачи я использовал связку pipe() и condition variable. Например, основной поток очереди ждет данных по condition variable. При поступлении сообщения он передает его worker’у через дескриптор, выданный pipe(). Таким образом в worker’е получится полностью отказаться от ожиданий по времени и обойтись лишь одним select().

anonymous
()

Пишу приложение, в котором обмен сообщениями между несколькими потоками я собираюсь организовать при помощи очередей сообщений

и в чём собственно вопрос ? могу порекомендовать спросить гуглу про «очереди сообщений»..

там настолько тьма вариантов. что выбрать можно только зная нюансы решаемой задачи.

в первую очередь смотрят zmq, у него есть спец-режимы «внутри одного процесса между тредами». Но он низкий уровень, можно найти что повыше

MKuznetsov ★★★★★
()

У меня была подобная задача: куча тредов складывают задачи в очередь а тред из пула тредов берет задачу и обрабатывает. Схема работы примерно такая:

В потоках обрабатывающих запросы вызывается submitTask(data) где data указатель на структуру с данными для обработки.

int submitTask(client_data *data) {
    pthread_mutex_lock(&mutexQueue);
    if (taskCount > MAX_QUEUE_SIZE - 1 ) {
        logger(ERROR, "Cannot process request, processing queue is full!\n", "");
        pthread_mutex_unlock(&mutexQueue);
        pthread_cond_signal(&condQueue);
        return 1;
    }
    taskQueue[taskCount] = data;
    taskCount++;
    pthread_mutex_unlock(&mutexQueue);
    pthread_cond_signal(&condQueue);
    return 0;
}

В каждом треде процессинга:

    while (true) {
        ...
        // ----------------Begin Critical section --------------------
        pthread_mutex_lock(&mutexQueue);
        while (taskCount == 0 && !quit_signal) {
            pthread_cond_wait(&condQueue, &mutexQueue);
        }

        if (!quit_signal) {
            data = taskQueue[0];
            for (register int i = 0; i < taskCount - 1; i++) {
                taskQueue[i] = taskQueue[i + 1];
            }
            taskCount--;
        } else {
            pthread_mutex_unlock(&mutexQueue);
            goto thread_exit;
        }
        pthread_mutex_unlock(&mutexQueue);
        // ----------------End Critical section -----------------------
       ...
   }

Работает как швейцарские часы (с).

iron ★★★★★
()
Последнее исправление: iron (всего исправлений: 1)

При чем каждый поток должен ожидать сообщения от нескольких потоков одновременно.

В термина ZeroMQ. Принимающий SUB+bind, отправляющие PUB+connect.

AlexVR ★★★★★
()

А зачем потокам просыпаться для проверки? Вообще, похоже на банальный event loop.

Делаешь кольцевой буфер (таже очередь, в общем-то), кидаешь в него сообщения и регистрируешь таблицу/список callback-ов, которые будет дёргать event loop. Ну, если не хочется дёргать все потоки на каждое сообщение, можно сделать логику масок, чтобы evloop дёргал потоки только на интересные для них сообщения.

Ну как-бы и всё: никто не пишет в evloop - все потоки спят, в evloop кто-то написал - он дёргает потоки, которые подписались на сообщения (ну или все, если маскирования нет).

В общих чертах и libuv/libev/libevent устроены по тому же принципу.

SkyMaverick ★★★★★
()
Последнее исправление: SkyMaverick (всего исправлений: 2)

… между несколькими потоками … очередей сообщений … каждый поток должен ожидать сообщения от нескольких потоков одновременно.

Модель акторов? Для разных ЯП есть свои реализации. Для С++ есть SObjectizer. Если уж хочется самому. То в путь.

Сами очереди событий буду писать сам, объектами SysV пользоваться не буду, так как мне не нужно взаимодействие между процессами.

Сам так сам. Но доступ к очереди из разных потоков требует синхронизации.

Проблема в том, что все алгоритмы, реализации которых мне удалось найти предлагают различные варианты с периодическим опросом и ожиданием, типа sleep().

Сон бывает разный. Для уведомлений есть «Condition variable»

AlexVR ★★★★★
()
Ответ на: комментарий от PRN

Спасибо. Слишком мощный инструмент. Опять-таки, там почти все заточено под IPC. Использовать его в моем маленьком сервере, где сообщения надо между потоками гонять, помоему слишком жирно.

leonopulos
() автор топика