LINUX.ORG.RU

Оцените потокобезопасность кода

 


0

2

Есть вот такой код:

Thread * volatile threadQueue[TASK_SCHEDULER_MAX_PRIORITY + 1];
int threadMaxRunningPriority = 0;
Mutex threadQueueMutex;
Thread * volatile threadResumeQueue;
Mutex threadResumeQueueMutex;

void schedulerInit(void) {
	mutexInit(&threadQueueMutex);
	mutexInit(&threadResumeQueueMutex);
}

inline Thread *schedulerFindNextTask(void) {
	Thread *nextThread = NULL;
	if (mutexTryLock(&threadQueueMutex)) {
		for (; threadMaxRunningPriority >= 0; threadMaxRunningPriority--) {
			if (threadQueue[threadMaxRunningPriority] != NULL) {
				nextThread = threadQueue[threadMaxRunningPriority];
				threadQueue[threadMaxRunningPriority] = nextThread->nextScheduled;
				break;
			}
		}
		mutexUnlock(&threadQueueMutex);
	}
	return nextThread;
}

void schedulerResumeTask(Thread *thread);

void schedulerResumeDelayedTasks(void) {
	if (mutexTryLock(&threadResumeQueueMutex)) {
		while (threadResumeQueue != NULL) {
			Thread *thread = threadResumeQueue;
			if (__sync_bool_compare_and_swap(&threadResumeQueue, thread, thread->nextScheduled)) {
				schedulerResumeTask(thread);
			}
		}
		mutexUnlock(&threadResumeQueueMutex);
	}
}

void schedulerResumeTask(Thread *thread) {
	if (mutexTryLock(&threadQueueMutex)) {
		if (threadQueue[thread->priority] == NULL) {
			thread->nextScheduled = thread;
			thread->prevScheduled = thread;
			threadQueue[thread->priority] = thread;
		} else {
			thread->nextScheduled = threadQueue[thread->priority];
			thread->prevScheduled = thread->nextScheduled->prevScheduled;
			thread->nextScheduled->prevScheduled = thread;
			thread->prevScheduled->nextScheduled = thread;
		}
		if (thread->priority > threadMaxRunningPriority) {
			threadMaxRunningPriority = thread->priority;
		}
		mutexUnlock(&threadQueueMutex);
		schedulerResumeDelayedTasks();
	} else {
		do {
			thread->nextScheduled = threadResumeQueue;
		} while (!__sync_bool_compare_and_swap(&threadResumeQueue, thread->nextScheduled, thread));
	}
}

void schedulerSuspendTask(Thread *thread) {
	mutexLock(&threadQueueMutex);
	if (threadQueue[thread->priority] == thread) {
		threadQueue[thread->priority] = thread->nextScheduled;
		if (threadQueue[thread->priority] == thread) {
			threadQueue[thread->priority] = NULL;
		}
		return;
	}
	thread->nextScheduled->prevScheduled = thread->prevScheduled;
	thread->prevScheduled->nextScheduled = thread->nextScheduled;
	mutexUnlock(&threadQueueMutex);
	schedulerResumeDelayedTasks();
}

Поля структуры Thread nextScheduled и prevScheduled описаны как Thread * volatile.

Условия такие:

1) schedulerFindNextTask может прервать любую функцию, кроме самой себя. Ей допустимо иногда возвращать NULL вместо того, чтобы выполнить свою работу.

2) schedulerResumeTask может прервать любую функцию, в том числе саму себя. Она не имеет права блокироваться.

3) schedulerSuspendTask не может прервать никакую функцию, но два потока могут вызвать эту функцию одновременно. Допустима блокировка выполнения.

Больше никакой код к полям nextScheduled и prevScheduled, а также вышеописанным глобальным переменным не обращается.

1) Достаточно ли проверок и условий в этих функциях, чтобы гарантировать, что структура данных (связанный список) не будет никогда повреждена и не возникнет зависания (schedulerFindNextTask и schedulerResumeTask не имеют права блокироваться ни при каких обстоятельствах).

2) Можно ли доверять встроенной функции GCC __sync_bool_compare_and_swap на платформе ARM Cortex-M3?

3) Возможно ли переписать данный код более оптимально, сохранив безопасность?

★★★★★

Последнее исправление: KivApple (всего исправлений: 5)
Ответ на: комментарий от shkolnick-kun

Ты не прав. Цикл while означает «выполнять, пока не получится сделать всё атомарно». То есть, если нас не прерывали, то случится break на первой же итерации. Если прервали, то придётся кое-что переделать (структура данных при этом не будет разрушена).

Добавление (пробуждение ото сна) задачи в список выполняется за О(1).

Вот с удалением есть проблема, потому что надо найти предыдущий элемент (вложенный while). Там сложность O(N). Но отправлять задачу в сон обработчики прерывания не будут, только обычные процессы.

Кстати, N здесь количество не спящих задач с таким же приоритетом, а не общее количество задач.

Я многое переделал в коде. Теперь последний элемент списка указывает на NULL.

http://pastebin.com/EqG53n4F

А вот упрощённый вариант алгоритма добавления-удаления, если лень разбираться, http://pastebin.com/2TvDf0Sy.

У меня срабатывает assert при тестировании. Не могу понять, где ошибка.

KivApple ★★★★★
() автор топика
Последнее исправление: KivApple (всего исправлений: 3)
Ответ на: комментарий от KivApple

Цикл while означает «выполнять, пока не получится сделать всё атомарно». То есть, если нас не прерывали, то случится break на первой же итерации.

Спин блокировка - вид сбоку.

Вот с удалением есть проблема, потому что надо найти предыдущий элемент (вложенный while). Там сложность O(N). Но отправлять задачу в сон обработчики прерывания не будут, только обычные процессы.

Какая разница, где будет превышено максимальнодопустимое время выполнения?

Конец немного предсказуем.

http://pastebin.com/EqG53n4F

Лучше на гитхаб

shkolnick-kun ★★★★★
()
Ответ на: комментарий от KivApple

У меня срабатывает assert при тестировании. Не могу понять, где ошибка

Ну так до CAS(list.head, item->next, item) в общем случае (item != head), т.к. List.head еще не равно item.

shkolnick-kun ★★★★★
()
Ответ на: комментарий от shkolnick-kun

Я бы сказал не в общем случае, а вообще такого не должно быть, чтобы item == head до присваивания. Ведь голова списка по определению указывает на такой элемент, который в списке есть. А именно последующий CAS является добавлением элемента в список, до этого элемент списку не принадлежит. И assert(item != head) вызывает падение программы лишь при условии, что item == head. То есть возникает невозможная ситуация, которая возможна лишь при разрушении списка.

Спин блокировка - вид сбоку.

Не совсем. Вот смотри. В той же ChibiOS/RT после каждого прерывания запускается перепланирование задач, если что-то изменилось. У меня перепланирование (переход на следующую итерацию цикла while вместо выхода из него) происходит только если случилось прерывание во время планирования. То есть просто к стоимости каждого прерывания добавляется стоимость новой итерации цикла. С учётом того, что цикл маленький, эта добавленная стоимость будет очень мала. То же касается и функций добавления/удаления. К стоимости прерывания добавляется стоимость перезапуска всех функций добавления/удаления, а также планировщика, которые он прервал.

И все эти циклы гарантированно завершаться как только перестанут приходить прерывания. Однако если прерывания приходят так часто, что даже коротенькие функции не успевают дойти до конца, то система явно перегруженна, а в таких условиях никакая RTOS ничего не гарантирует (кроме того, что высокоприоритетные прерывания, не использующие функции RTOS, будут нормально работать, но у меня тоже так).

KivApple ★★★★★
() автор топика
Последнее исправление: KivApple (всего исправлений: 1)
Ответ на: комментарий от KivApple

Если падает, значит есть две конкурентные попытки вставить один и тот же элемент в один и тот же список.

Значит алгоритм кривой, раз такое можно сотворить.

Пишите, Шура, пишите... (С)

anonymous
()
Ответ на: комментарий от anonymous

А вот и не угадал. Сделал такую штуку. Добавление элемента - lock-free. Перед удалением хватаю mutex, а после удаления отпускаю. То есть удаление не может прерывать удаление, но может прерывать добавление, а добавление может прерывать и само себя и удаление.

И всё работает. А если сделать наоборот (lock-free удаление и добавление с mutex), то ошибка не исчезает. Значит ошибка таки в функции удаления.

KivApple ★★★★★
() автор топика
Ответ на: комментарий от KivApple

В той же ChibiOS/RT после каждого прерывания запускается перепланирование задач, если что-то изменилось.

Это правильно.

А вот и не угадал. Сделал такую штуку. Добавление элемента - lock-free. Перед удалением хватаю mutex, а после удаления отпускаю. То есть удаление не может прерывать удаление, но может прерывать добавление, а добавление может прерывать и само себя и удаление.

А вот тут фейл. Надо брать отладчик и выяснять «что пошло не так», а ты пытаешься методом тыка определить «как правильно».

И да, у тебя во время траверса этого твоего стека не захватываются блокировки элементов, что может привести к повреждению этого самого стека. С другой стороны, если ты будешь их захватывать, может случиться дэдлок.

Вот поэтому то и нужна блокировка всего стека.

shkolnick-kun ★★★★★
()
Ответ на: комментарий от shkolnick-kun

Я смог таки запилить lock-free планировщик - http://pastebin.com/HirFnhMb. Правда он немного странный. Я использую ленивое усыпление нитей. То есть усыпление по факту приводит лишь к выставлению спец. флага. А планировщик при попытке переключиться на процесс с таким флагом удаляет его из очереди и идёт к следующему. Если нить пробудят до того, как до неё дойдёт очередь, то никаких операций со списком не потребуется.

Прогнал некоторые тесты. Вот результаты (всё время в наносекундах, график построен по первому блоку тестов):

https://pp.vk.me/c627616/v627616975/650cd/ofCdRyvTK-o.jpg

Процессор - Core i5-4210U (4 ядра). 1 поток (настоящий, линуксовый) 10 миллионов раз вызывает schedulerNextThread. 3 потока (тоже настоящих) в бесконечном цикле (точнее пока не закончит выполнение главный поток) генерируют 2 случайных числа. Первое - номер потока (не настоящего, а моего), второе - действие (отправить в сон или наоборот пробудить). И выполняют это действие с указанным потоком, замеряя время выполнения действия. При этом если поток уже был в нужном состоянии или заблокирован другой операцией (если кто-то уже начал добавлять/удалять поток, то функция выходит ни с чем - кто первый начал, тот и прав), то её время выполнения не учитывается в общей статистике. Ах да, процесс запускаю от root и с nice -19.

Можно заметить, что время исполнения Suspend и Resume практически не зависит от количества потоков, а время поиска нового потока зависит примерно линейно. Впрочем, это так лишь в худшем случае, когда планировщику приходится за раз усыплять кучу потоков, прежде чем он найдёт бодрствующий.

На мой взгляд условия теста более жёсткие, чем бывает на микроконтроллерах в реальных задачах, так что для грубой оценки сверху вполне подходят (весьма сомнительно, что в штатных условиях работы, может быть результат хуже). А ещё ведь микроконтроллер одноядерный и уровень вложенности вызовов функций suspend/resume весьма ограничен, так что потери на CAS должны быть значительно меньше. Resume должен выполняться быстрее, чем FindNext.

Получается, что среднее время работы планировщика на Core i5 - 50 наносекунд (вряд ли кто-то будет запускать на МК сотни процессов, да ещё и непрерывно будить и усыплять их, спящие потоки ресурсов кроме памяти не жрут, ибо удалены из очереди). Есть идеи, как можно очень грубо перевести это в ситуацию с микроконтроллером? Во сколько раз мой Core i5 круче какого-нибудь STM32F103?

P.S.: Проверял корректность работы планировщика на большем числе итераций (вплоть до 10 миллиардов) с различными уровнями оптимизации (O0, O2 и O3). Ни при каких обстоятельствах не происходит разрушения данных или зависания. А функция замера времени отбрасывает оверхед от вызова функций получения timestamp.

KivApple ★★★★★
() автор топика
Последнее исправление: KivApple (всего исправлений: 2)
Ответ на: комментарий от KivApple

А планировщик при попытке переключиться на процесс с таким флагом удаляет его из очереди и идёт к следующему.

Подобную технику использовали в L4, она приводит к опасности отказа в обслуживании, поэтому в seL4 от нее отказались.

Если работает - пока не трогай. Но лучше все таки найти замену.

shkolnick-kun ★★★★★
()
Ответ на: комментарий от tailgunner

Так оно сейчас фактически не привязано к ОС, если не считать «шапки».

А такб вполне себе в юзерспейсе пригодиться может...

shkolnick-kun ★★★★★
()
Ответ на: комментарий от KivApple

в целом, shkolnick-kun правильные замечания высказал насчёт RT. даже добавить нечего, пожалуй.
что касается процессора, то Core-i5 ты используешь не напрямую. а на мелкоконтроллере ты вроде как собираешься выполнять код прямо на железяке, если я правильно понимаю. железяка намного шустрее любой оси, но на мелкоконтроллере не будет таких плюшек, которые предоставляют мощные процы. так что сравнить «насколько круче» вряд ли возможно. это разные процы под разными системами.
однако, на контроллерах начинаются игры с обработкой прерываний. там другие проблемы. там таймеры, тики, надо успевать всё обрабатывать. там никто не запускает стопицот процессов, зато есть всякая периферия, которая требует обработки.

Iron_Bug ★★★★★
()

В общем поигрался я с локфри, не нравится: 1. Мне надо 2-связные списки, а не очереди, список сделать не удалось. 2. В википедии пишут, что не блокирующие алгоритмы в большинстве случаев проигрывают в производительности ладе самым «наивным» реализациям с блокировкой. 3. Размер кода, очевидно, увеличится.

В общем, что для академических целей хорошо, для работы «в поле» - смерть... /thread

shkolnick-kun ★★★★★
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.