LINUX.ORG.RU

[C++] события и pthreads

 


0

1

Предположим, есть приложение в котором должен быть свой внутренний механизм событий. Функции менеджера событий будет выполнять отдельный поток, контролирующий списки обработчиков событий - т.е регистрация/удаление обработчиков, прием событий, активация обработчиков. Т.к обработчики должны вызываться там же, откуда пришел запрос на регистрацию(в контексте того же потока), то возникает вопрос: как можно вызвать функцию в контексте нужного нам потока при помощи pthreads? Маны читал, хедер pthread.h читал, да и гугл с опеннетом не обошел стороной, но как-то не нашел чего-то хорошего. Или я может изначально хочу чего-то странного?

Знаю, есть буст и POCO, пытался их применить, при помощи NotificationQueue в POCO даже получилось, но чувство дичайшего оверхеда меня не покидало на протяжении всего процесса.

1. Не надо придумывать велосипед - используй куте. Там есть сигналы и слоты. Да и кастом-ивенты тоже.

2. Считающий семафор + callbacks в потоках. Менеджен ивентов блокируется на семафоре. Тот кто создаёт ивент, записывает его в очередь и инкрементирует семафор. Менеджер ивент извлекает из очереди, обрабатывает и потом удаляет (последнее опционально, зависит от общей архитектуры). Далее менеджер опять переходит на семафор и т.д.

mi_estas
()
Ответ на: комментарий от mi_estas

Не надо придумывать велосипед - используй куте. Там есть сигналы и слоты. Да и кастом-ивенты тоже.

это, конечно, отличный путь для минимизации оверхеда :)

shty ★★★★★
()

переключение контента - очень долгий процесс
два потока обменивающихся информацией - загрузят на 100% бесполезной работой процессор уже на 70-120 тысяч переключений в секунду
ты уверен что такое тебе нужно ?

а распределение задач - если нужно именно дергать работающий - то есть обычьные сигналы unix-овые - они именно прерывают выполнение текущей задачи

ae1234 ★★
()
Ответ на: комментарий от mi_estas

1. куте и прочее пока подождет - уже был как-то печальный опыт использования толстых либ и фреймворков без крепких знаний уровнем ниже.

2. Немного не так. Вот как раз мне и не нужно, чтобы ивент менеджер обрабатывал данные(ведь он будет блокироваться, а хочется прозрачного управления рабочими потоками). Мне приходят данные по сети, поток, отвечающий за сетевой ввод/вывод генерирует событие, ивент менеджер ловит, и вызывает обработчики по списку, а обработчики регистрировались потоками-воркерами. А данные потенциально могут приходить очень часто, так что жадть, пока ивент-менеджер обойдет все обработчики(а новые события тем временем будут прибывать) это кривизна в архитектуре, ИМХО.

Так же хочется, чтобы разработчик модуля к моему приложению мог написать что-то вроде

static void onDataIncome(...) {

...

}

...

MODULE modMain(...) {

System::Events::registerHandler(DATA_INCOME, onDataIncome);

... while(1) System::Events::event_wait(DATA_INCOME);

}

И при вызове event_wait поток модуля бы блокировался, до генерации системой события. А вот когда событие происходит, то поток модуля начинал бы работать в зарегистрированной функции. Т.е чтобы был ясный интерфейс для разработчика модуля, вроде модели событий в Javascript.

Number_Seven
() автор топика

> как можно вызвать функцию в контексте нужного нам потока при помощи pthreads?

Никак.

Да и вся схема странноватая.

tailgunner ★★★★★
()

Асинхронная actor-подобная модель реализована с дополнительным функционалом в Theron, вроде как гарантируется соблюдение последовательности исполнения сообщений.

Потоки реализованы через threadpool

westtrd
()
Ответ на: комментарий от ae1234

/*в предыдущем сообщении кое-где про разметку забыл в псевдокоде, надеюсь понятно будет*/

переключение контента

контекста?

Мне главное, чтобы оно логически быстро работало и имело возможность расширяться - т.е максимально эффективное использование тредпула из воркеров, поскольку на них будет идти основная нагрузка. Конечно, с ивент менеджером возможны проблемы при частом дерганьи контекста, но пока я не вижу другого решения.

Number_Seven
() автор топика
Ответ на: комментарий от westtrd

Хм, спасибо, попробую посмотреть Theron, на первый взгляд можно приспособить.

Number_Seven
() автор топика
Ответ на: комментарий от Number_Seven

а какая длительность работы воркеров над событием ? если она чисто логическая - без долгих расчетов - то весь выйгрыш сьест переключение контента

может просто в одном потоке сделать кучу мелких воркеров ? ;)
и неблокирующий ввод вывод

ae1234 ★★
()
Ответ на: комментарий от ae1234

В идеале, конечно, хорошо бы иметь эффективность на разном времени работы воркеров. Но идеал недостижим)

Воркеры может будут и не очень долго работать, проблема в том, что данные по сети могут валить очень быстро.

Number_Seven
() автор топика
Ответ на: комментарий от Number_Seven

ивент менеджер ловит, и вызывает обработчики по списку

на сколько я понял, тебе нужно эти обработчики запускать в контексте потока, которые регистрировал обработчик. Если так, тогда единственный механизм, который мне известен - это сигналы, но они являются сущностями одними и теми же в рамках одного процесса. Т.е. для всех потоков они одни и теже. Кроме того, согласно докам, невозможно предсказать, какой поток будет эти сигналы обрабатывать (если конечно маску не установить перед тем как). Так что, что бы использовать сигналы тебе надо будет не потоки, а процессы форкать. Но тогда подозреваю проблемы с передачей данных события из одного процесса в другой.

Если у тебя однопроцессорная система потоки использовать нет смысла. Делай select/poll на сокете и обрабатывай всё в одном потоке. С одним ядром так будет быстрее всего.

Да и это, я перечитал тред и думаю что:

как можно вызвать функцию в контексте нужного нам потока при помощи pthreads?

быстро - никак.

Или я может изначально хочу чего-то странного?

Да.

... while(1) System::Events::event_wait(DATA_INCOME);

Вот это либо блокировка, либо busywait. Для первого нужен семафор (смотри мой первый пост) или другой блокирующий примитив, для второго загрузка проца на 100% перманентно.

mi_estas
()
Ответ на: комментарий от Number_Seven

Воркеры может будут и не очень долго работать, проблема в том, что данные по сети могут валить очень быстро.

select/poll - в данном случае то, что доктор прописал.

mi_estas
()

как можно вызвать функцию в контексте нужного нам потока при помощи pthreads?

Никак.

Если у тебя в каждом потоке работает свой event-loop, то в различных обертках над poll/select/kqueue/epoll есть механизмы посылки сообщений в конкретный поток. Например, в boost::asio можно обернуть функцию boost::bind'ом и послать в event-loop из другого потока, при этом она будет выполнена в потоке event-loop'а.

А вообще ты хочешь странного.

Reset ★★★★★
()
Ответ на: комментарий от mi_estas

А я бы написал для теста и измерил перформанс. Вот тогда бы и поговорить было о чём.

а причём тут оверхед и быстродействие? я про то что тащить для сравнительно небольшой, к примеру, программулины библиотеку на десяток мегабайт - не самое лучшее, видимо, решение

shty ★★★★★
()
Ответ на: комментарий от Number_Seven

ну а я про что - если данные из сети будут валиться очень быстро - то эффективнее однопоточного решения просто нету
(если только воркеры незаняты долгими вычислениями)

неблокирующий ввод вывод - и ограниченная (как можно меньшая) величина кванта работы воркеров - и нулевой оверхед изза переключений контента

ae1234 ★★
()

Можно boost::asio взять. Я так понял, треды у тебя привязвны к каким-то объектам (а ля active objects, actors). Если у каждого будет свой asio::io_service, можно при регистрации ссылку (или weak_ptr) на нужный io_service передавать - диспетчер (или даже тот, кто генерирует сигнал, т.к. диспетчер вроде и не нужен) будет постить (io_serivce::post) вызов в этот сервис.

ratatosk
()
Ответ на: комментарий от ratatosk

p.s. У меня в проекте треды не привязаны к объектам, io::service один на все приложение. Использую boost::signals2, если надо вызвать сигнал так, чтобы обработчики вызвались в другом треде (например, если в текущем надо что-то быстро сделать), делаю так

get_io_service().post(boost::bind(boost::ref(on_status_changed), status_));

ratatosk
()
Ответ на: комментарий от mi_estas

Пользовательских сигналов в общем-то раз, два, ... и нету. А если использовать отдельные процессы, то это конечно хорошо, и есть shared memory и пайпы для данных, но это тянет за собой кучу проблем в виде синхронизации.

Ну тут ниже уже объяснили что к чему, пойду думать дальше по поводу архитектуры.

Всем спасибо!

Number_Seven
() автор топика
Ответ на: комментарий от one_more_hokum

Не асинхронный он, и еще может быть задача обеспечить гарантированную последовательность вызовов экземляров сообщений.

westtrd
()

велосипед на boost::thread

мой ответ лежит в контекте вопроса: «как можно вызвать функцию в контексте нужного нам потока при помощи pthreads»

typedef boost::function< void( void ) > simple_function;
typedef std::list<boost::simple_function> q_t;
class Thread
{
	q_t qt_;
	boost::condition_variable cond_;
	bool stop_flag_;
	boost::lock_как_его_там mylock_;
public:
	void stop()
	{
		stop_flag_ = true;
		cond_.notify_all();
	}
	void add_task( boost::function& f )
	{
		boost::unique_lock< ... > locker;
		qt.push_front( f );
	}
	size_t task_count() // for load-balansing
	{
		return qt.size();
	}
	void threadfunc()
	{
		while( cond_.wait() && !stop_flag_ )
		{
			if( qt_.empty() ) continue;
			boost::unique_lock< ... > locker;
			qt.front()(); // вызвать
			qt.pop_back(); // убрать из очереди
		}
	}
}
// класс, методы которого нужно вызывать из других потоков
struct MyClass1{ void DBOperation1( const char* name ) { db.update( name ) } };

int main()
{
	MyClass1 db1;
	Thread ths[10];
	boost::thread* bths[10];
	// startup
	for( int i = 0; i < 10; i++ )
	{
		bths[i] = new boost::thread( boost::bind( Thread::threadfunc, &ths[i] ) );
	}
	// use
	for( i = 0; i < 100000000; i++ )
	{
		ths[i%10].add_task( boost::bind( MyClass1::DDBOperation1, &db1, "Sergey" ) )
	}
	//stop
	for( i = 0; i < 10; i++ )
		ths.stop();
	// join
	for( i = 0; i < 10; i++ )
	{
		bths[i]->join();
		delete bths[i];
	}
	
}

И где здесь оверхед? и да, как Reset замечает, через boost::asio тоже можно, но там коллбеки передаются через системный вызов...

anymouse
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.