LINUX.ORG.RU

Разбудить epoll_wait() раньше времени.

 


2

2

Есть объект Connection, представляющий соединение. Соединение менеджится epoll-ом. На всякие события, происходит вызов Connection::eventRead() или там Connection::eventWrite().

Connection персистентный, т.е. существует вечно. Что будет если его дропнут - рассмотрено и обработано.

И вот из другого потока я хочу через этот Connection() отправить данные. Для этого в другом потоке надо запихнуть пачку данных в Connection() (в его буфер), перевести состояние Connection в «посылай запрос» и пнуть Connection::eventWrite() - тогда оно эту пачку упихает в дескриптор в один или несколько приёмов (уже по дальнейшим событиям от epoll). Ну там внутри Connection пачка данных ещё и оформляется по нужному протоколу, но это уже не суть, это происходит в момент запихивания.

Казалось бы, всё ништяк. Но я не хочу трогать Connection() из разных потоков. Можно добавить межпоточную синхронизацию в Connection(), но можно ли обойтись?

Хочется epoll-потоку сказать «проснись из epoll_wait() и сделай такую-то хрень». Он должен проснуться из epoll_wait() и найти специальный флаг «сделать хрень» во взведённом состоянии и хрень осуществить. В хрени будет подготовка Connection() к отправке данных и поток снова уйдёт в epoll_wait(). Короче, работа с Connection будет вестись всегда в одном потоке: epoll-потоке.

Как будить epoll_wait()? Ну например можно завести pipe, добавить его в этот же epoll и пихать туда байт 1, а реакцей на побудку epoll_wait() от дескриптора этого pipe и будет моя хрень. А какие ещё существуют способы? Какие есть механизмы IPC, выразимые в (int) дескрипторах и отслеживаемые epoll? И какой из всех 100 вариантов самый быстрый?

P.S.

Вариант с eventfd:

Постинг задачи

// можно и не атомик, а сырой поинтер
std::atomic<T_TASK*> task_;


// Этикетка метода: !!! WARNING !!!
// Запрещено постить 2 раза подряд, очереди нет!
// Запостил - жди исполнения, иначе жопа.
bool exec_task(T_TASK *_task) {
    task_ = _task;

    LOG_PURE("... ... post task");

    uint64_t add = 1;
    ::write(fd_event_, &add, sizeof(uint64_t));

    return true;
}

Исполнение задачи:

// цикл разгребания пачки событий от epoll
...
if ( events[i].data.fd == fd_event_ ) {
    if (events[i].events & EPOLLIN) {
        uint64_t val;
        // TODO: check read() return value.
        // TODO: check val's value.
	read(fd_event_, &val, sizeof(uint64_t));

	(*task_)();
    }
    continue;
}
...



Последнее исправление: hlamotron (всего исправлений: 10)

Если у тебя epoll то можешь завязаться на использование eventfd. А так используют socketpair() один из концов которого засовывают в пулер.

mashina ★★★★★
()
Ответ на: комментарий от mashina

Если у тебя epoll то можешь завязаться на использование eventfd.

Ниасилил. Что значит «завязаться на использование eventfd»? Разжуйте для дебилов.

hlamotron
() автор топика
Ответ на: комментарий от hlamotron

eventfd доступен только в линукс, т.е. решение будет эффективным, но не портабельным.

mashina ★★★★★
()

1) выкинуть велосипед, взять libevent или аналоги 2) сделать выделеный pipe или eventfd чтобы с его помощью прерывать epoll_wait

anonymous
()
Ответ на: комментарий от anonymous

1) выкинуть велосипед, взять libevent или аналоги

А чо не сразу нанять разраба ?

hlamotron
() автор топика

и найти специальный флаг «сделать хрень» во взведённом состоянии и хрень осуществить.

Как минимум это не безопасно: вдруг epoll-поток сейчас не ждёт epoll, а проверяет флаг? Лучше не выпендриваться, а сделать по-нормальному, т.к. захват/освобождение незагруженного мьютекса не сильно дороже, чем запись/чтение в атомарную переменную с барьерами.

vzzo ★★★
()

пулер обрабатывает дескрипторы сокета ? - да, зачем искать варианты когда вон он под носом, локальный сетевой или unix domain сокет, пихаем его в пулер и когда нужно разбудить раньше времени пишем в этот дискриптор

anonymous
()
Ответ на: комментарий от vzzo

Как минимум это не безопасно: вдруг epoll-поток сейчас не ждёт epoll, а проверяет флаг? Лучше не выпендриваться, а сделать по-нормальному, т.к. захват/освобождение незагруженного мьютекса не сильно дороже, чем запись/чтение в атомарную переменную с барьерами.

Он не проверяет флаг до тех пор, пока его не разбулии специальным событием. Флаг может быть взведён только ДО того, как такое событие послано. Т.е. гарантируется, что он не полезет проверять флаг, пока перед этим его не взвели. А взводить до тех пор, пока предыдущий не отработал будет запрещено семантикой интерфейса — кинул задачу - жди исполнения, кинул задачу 2 раза подряд - получи сегфолт.

hlamotron
() автор топика
Ответ на: комментарий от anonymous

пулер обрабатывает дескрипторы сокета ? - да, зачем искать варианты когда вон он под носом, локальный сетевой или unix domain сокет, пихаем его в пулер и когда нужно разбудить раньше времени пишем в этот дискриптор

Искать варианты затем, что существует дофига разных сущностей и возможны нюансы по работе с ними. Остановился на eventfd.

hlamotron
() автор топика
Ответ на: комментарий от hlamotron

только если платформа не будет в дальнейшем расширятся на др ос,а так в libev если заглянуть то универсальный это сокет

anonymous
()
Ответ на: комментарий от anonymous

только если платформа не будет в дальнейшем расширятся на др ос,а так в libev если заглянуть то универсальный это сокет

Да хоспаде, перешишу за вечер под другую платформу с сохранением интерфейса.

hlamotron
() автор топика

Ты не поверишь, но можно поменять набор дескрипторов через epoll_ctl из другого треда. То есть, если у тебя все очень аккуратно спроектировано, то просто добавить EPOLLOUT-watcher.

kawaii_neko ★★★★
()
Ответ на: комментарий от hlamotron

Остановился на eventfd

На всякий случай хочу сразу предупредить: eventfd очень специфичная штука, и когда несколько потоков сделают write в него, вся их деятельность будет «выгребена» за один read. Однажды я об этом недостаточно хорошо подумал и словил интересные «подвисания».

kawaii_neko ★★★★
()
$ man 2 epoll_wait
...
ERRORS
...
       EINTR  The call was interrupted by a signal handler before either (1) any of the requested events occurred or (2) the timeout expired; see signal(7).

...
anonymous
()
Ответ на: комментарий от kawaii_neko

На всякий случай хочу сразу предупредить: eventfd очень специфичная штука, и когда несколько потоков сделают write в него, вся их деятельность будет «выгребена» за один read. Однажды я об этом недостаточно хорошо подумал и словил интересные «подвисания».

Сее я прочитал в мане и был вооружён (-; Повезло. Иногда нет нет, да и прочитаешь ман! Есть режим EFD_SEMAPHORE, в котором за 1 read выгребается единица, а в «обычном» режиме за read выгребается текущее значение счётчика и он сбрасывется. Но я не планирую в несколько потоков дёргать за пипку, наклею этикетку «не дёргать пока предыдущий вызов не отработал».

hlamotron
() автор топика
Ответ на: комментарий от kawaii_neko

Ты не поверишь, но можно поменять набор дескрипторов через epoll_ctl из другого треда. То есть, если у тебя все очень аккуратно спроектировано, то просто добавить EPOLLOUT-watcher.

Поверю. А при чём тут это?

hlamotron
() автор топика
Ответ на: комментарий от hlamotron

А при том. Допустим, в другом треде ты осознал, что нужно к чему-то приконнектиться и что-то там сделать. Допустим, что ты настолько мудр, что связываешь с файловыми дескрипторами структурки

struct epoll_action {
    int fd;
    void (*exec)(struct epoll_action *a, int epoll_mask);
    void *userdata;
};

Тогда в потоке, который инициирует коннект (epoll_wait вертится где-то в другом месте!), мы просто и без лишних костылей начинаем установку соединения, а завершаться она уже будет там, где работает epoll_wait:

struct epoll_event ev;
struct epoll_action *a = malloc(sizeof(*a));
a->fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
a->exec = on_connect;
a->userdata = ...;
connect(a->fd, sock_addr, sizeof(sock_add)); // естественно, проверяешь возможные ошибки,
                                             // потому что коннект на localhost может сразу же получить RST
ev.data.ptr = a;
ev.events = EPOLLOUT | EPOLLERR;
epoll_ctl(epfd, EPOLL_CTL_ADD, a->fd, &ev);
И все, как только пройдет обмен syn-ами, epoll_wait в сетевом потоке проснется и бросится обрабатывать событие.

Ни одна из упомянутых в треде библиотек не позволяет такие трюки (ровно как и EPOLLET, поэтому все epoll-based библиотеки априори конченные, а кроссплатформенные - конченные в квадрате).

kawaii_neko ★★★★
()
Ответ на: комментарий от kawaii_neko

Спасибо за развёрнутую телегу с блек-джеком и шлюхами!

Чё-то в конце не осилил. Если epoll-позволяет, то как можно все epoll-based библиотеки конченными называть? Не улавливаю тонкого намёка... А при чём тут EPOLLET в этом свете трюков?

Вообще у вас описан случай установки новых коннектов и похоже я так и делаю (судя по моему быдлокоду). То есть, когда мой epoll_wait()-поток ещё не знает про Connection, я невозбранно в другом потоке проделываю с ним socket, connect и т.п. и только потом отношу в epoll и дальше только получаю события по нему.

Но у меня ситуация другая - epoll_wait()-поток уже знает про connection, уже отслеживает события на нём и потому, когда хочется сделать что-то непредвиденное с этим коннектом, хотелось бы осуществить это от имени epoll_wait()-потока. Альтернатива - напихать мьютексов в Connection и не парить моск.

hlamotron
() автор топика
Ответ на: комментарий от hlamotron

Если epoll-позволяет, то как можно все epoll-based библиотеки конченными называть?

Потому что библиотеки пишут в расчете на то, что пользоваться ими будут криворукие быдлокоде, поэтому, несмотря на то, что epoll_ctl потокобезопасный, нет ни одной библиотеки, которая бы позволяла менять watcher-ы из других тредов.

Если тебе нужно подменять обработчик прямо «на лету» (т. е. из другого потока изменять активный обработчик, который в этот момент может быть «возвращен» из epoll_wait как сработавший, простых и элегантных решений нет (простое будет течь по памяти, а элегантного даже в голову не приходит - везде нужно дублирование объектов, находящихся внутри epoll-а с мьютексами и прочими вытекающими).

Неэлегантное решение выглядит так: есть потокобезопасное отображение fd => shared_ptr<handler_with_state>, если нужно что-то поменять, сначала меняется handler_with_state, а затем выполняется EPOLL_CTL_MOD. handler-ы нужно писать очень аккуратно в расчете на то, что handler может быть вызван на «не правильную» маску событий (т. е. когда указатель на него уже подменили, а epoll_ctl не успели выполнить).

Всегда есть вариант организовать цепочку обработчиков (как в nginx) и навешивать на нее дополнительные handler-ы - это, наверное, может выглядеть чуть менее криво.

Короче говоря, если тебе нужно заставить epoll_wait-поток сделать что-то по событию от существующего соединения - epoll_ctl тебе в руки. Если же нужно просто творить рандомный хардкор, никак не связанный с событиями на отслеживаемых дескрипторах, самое простое, что может быть - завести pipe и запихивать в него указатели на handler_with_state, а в обработчике этого пайпа просто

struct handler_with_state *h;
if (read(pipe_fd, &h, sizeof(h)) == sizeof(h))
  h->exec(h); // память освобождать не забываем
Правда если таких событий будет очень много, то ощутимо поднимется sys cpu usage (а что ты хотел - на каждое событие read+write+возврат из epoll_wait).

kawaii_neko ★★★★
()
Ответ на: комментарий от kawaii_neko

Если же нужно просто творить рандомный хардкор, никак не связанный с событиями на отслеживаемых дескрипторах, самое простое, что может быть - завести pipe и запихивать в него указатели на handler_with_state, а в обработчике этого пайпа просто...

Советуешь какую-то невероятную чушь. Эффективная и простая обвязка над пулером состоит из неблокирующей очереди с событиями и аналога evenfd для запуска вычитывания.

Правда если таких событий будет очень много, то...

То очередной write(...) в пайп скорее всего заблокируется.

ощутимо поднимется sys cpu usage (а что ты хотел - на каждое событие read+write+возврат из epoll_wait).

Основную нагрузку на cpu в твоей схеме даст работа с пайпом и всё особенно будет плохо если писать в него из нескольких потоков.

mashina ★★★★★
()
Ответ на: комментарий от mashina

Эффективная и простая обвязка над пулером состоит из неблокирующей очереди с событиями и аналога evenfd для запуска вычитывания.

Твоя очередь:

  1. write(event_fd, &var64, 8);
  2. epoll_wait
  3. read(eventfd, &dummy, 8);
  4. pthread_mutex_lock
  5. ev = get_event
  6. pthread_mutex_unlock
  7. handle_event(ev)

Мой вариант:

  1. write(pipe_fd, &ptr, sizeof(void*))
  2. epoll_wait
  3. read(pipe_fd, &ev, sizeof(void*))
  4. handle_event(ev)

Основную нагрузку на cpu в твоей схеме даст работа с пайпом

Ну да, это же такой тяжеловесный примитив. Я даже не буду спрашивать, известно ли тебе что-нибудь о splice, который через этот самый «тяжелый пайп» проталкивает данные, увеличивая вдвое число syscall-ов, необходимых для передачи данных между двумя дескрипторами.

То очередной write(...) в пайп скорее всего заблокируется.

Кто мешает сделать его неблокирующимся? В любом случае, если у обработчика в очереди скопилось 8192 элемента, это указывает на какую-то проблему.

и всё особенно будет плохо если писать в него из нескольких потоков

А вот с этого момента подробнее. Насколько мне известно pipe - лучший способ организовать mpmc очередь между io-bound потоками.

kawaii_neko ★★★★
()
Ответ на: комментарий от anonymous

а откуда взялось требования прервать пулер ? может банально уменьшить таймаут ?

Требование не «прервать пулер», а «выполнить кусок кода прямо сейчас из потока пулера».

hlamotron
() автор топика
Ответ на: комментарий от kawaii_neko

Твоя очередь:

Очередь же не блокирующая, никаких мьютексов не будет. В обоих случаях логика несколько сложнее, не понятно что ты хочешь сравнить слишком умозрительной схемой.

Ну да, это же такой тяжеловесный примитив. Я даже не буду спрашивать, известно ли тебе что-нибудь о splice, который через этот самый «тяжелый пайп» проталкивает данные...

Прекратил бы фантазировать и посмотрел как работает пайп, тогда может быть станет ясно почему write(...) на несколько порядков тяжёлее атомарных чтения и CAS + eventfd который даже не всегда будет дёргаться.

Насколько мне известно pipe - лучший способ организовать mpmc очередь между io-bound потоками.

У нас схема mpsc, т.е.один потребитель. Не понятно что ты подразумеваешь под «лучший». Если производительность, то пайп точно не может быть лучше mutex + condvar, а ещё существует мн-во реализаций очередей на атомиках и фьютексах с ещё более лучшей производительностью.

А вот с этого момента подробнее.

Кроме этого в онтопике пайп хуже масштабируется с ростом потоков даже по сравнению с AF_UNIX сокетами - cpu и задержки растут быстрее N и пайп обычно начинает сливать с 2-3 потоков. Всегда можешь написать небольшой тест и сравнить разные очереди (например, такой тест на пробуждение потребителя покажет и производительность очереди).

mashina ★★★★★
()
Ответ на: комментарий от mashina

У нас схема mpsc, т.е.один потребитель

Ну так-то каждый может

Не понятно что ты подразумеваешь под «лучший»

Я вроде бы ясно написал «mpmc между io-bound потоками». condvar тут вообще ничем не поможет. А уж по объему написанного кода передача указателя через pipe уделывает всех.

Кроме этого в онтопике пайп хуже масштабируется с ростом потоков

Не подозревал о таком. Однако тест у тебя довольно странный: блокирующее IO, никаких реальных очередей данных для cond/evfd (особенно круто для последнего не задавать EFD_SEMAPHORE, таким образом обеспечивая один read на кучу write-ов, что в mpmc не годится).

kawaii_neko ★★★★
()
Ответ на: комментарий от kawaii_neko

Ну так-то каждый может

А где нам нужно mpmc? Вроде же нигде не рассматриваем несколько пулеров и т.д.

Я вроде бы ясно написал «mpmc между io-bound потоками». condvar тут вообще ничем не поможет.

condvar для сравнения чтобы можно было оценить сколько верхеда даёт работа с файловыми дескрипторами по сравнению с обычной очередью без пулера. Можно же io события обрабатывать в отдельном от пулера потоке.

Однако тест у тебя довольно странный: блокирующее IO, никаких реальных очередей данных для cond/evfd (особенно круто для последнего не задавать EFD_SEMAPHORE, таким образом обеспечивая один read на кучу write-ов, что в mpmc не годится).

На eventfd очередь сделать нельзя, он годиться только для пробуждения пулера и делать очередь на пайпах/сокетах тоже смысла не имеет из-за дороговизны сискола (если есть альтернативы). Тест на механизмы пробуждения, но в случае с пайпами работает эквивалентно очереди, а для eventfd показывает наихудший сценарий lock-free очереди с пробуждением через eventfd (на каждый push нужно делать write(...) что случается не всегда).

mashina ★★★★★
()
30 августа 2018 г.
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.