LINUX.ORG.RU

Оцените потокобезопасность кода

 


0

2

Есть вот такой код:

Thread * volatile threadQueue[TASK_SCHEDULER_MAX_PRIORITY + 1];
int threadMaxRunningPriority = 0;
Mutex threadQueueMutex;
Thread * volatile threadResumeQueue;
Mutex threadResumeQueueMutex;

void schedulerInit(void) {
	mutexInit(&threadQueueMutex);
	mutexInit(&threadResumeQueueMutex);
}

inline Thread *schedulerFindNextTask(void) {
	Thread *nextThread = NULL;
	if (mutexTryLock(&threadQueueMutex)) {
		for (; threadMaxRunningPriority >= 0; threadMaxRunningPriority--) {
			if (threadQueue[threadMaxRunningPriority] != NULL) {
				nextThread = threadQueue[threadMaxRunningPriority];
				threadQueue[threadMaxRunningPriority] = nextThread->nextScheduled;
				break;
			}
		}
		mutexUnlock(&threadQueueMutex);
	}
	return nextThread;
}

void schedulerResumeTask(Thread *thread);

void schedulerResumeDelayedTasks(void) {
	if (mutexTryLock(&threadResumeQueueMutex)) {
		while (threadResumeQueue != NULL) {
			Thread *thread = threadResumeQueue;
			if (__sync_bool_compare_and_swap(&threadResumeQueue, thread, thread->nextScheduled)) {
				schedulerResumeTask(thread);
			}
		}
		mutexUnlock(&threadResumeQueueMutex);
	}
}

void schedulerResumeTask(Thread *thread) {
	if (mutexTryLock(&threadQueueMutex)) {
		if (threadQueue[thread->priority] == NULL) {
			thread->nextScheduled = thread;
			thread->prevScheduled = thread;
			threadQueue[thread->priority] = thread;
		} else {
			thread->nextScheduled = threadQueue[thread->priority];
			thread->prevScheduled = thread->nextScheduled->prevScheduled;
			thread->nextScheduled->prevScheduled = thread;
			thread->prevScheduled->nextScheduled = thread;
		}
		if (thread->priority > threadMaxRunningPriority) {
			threadMaxRunningPriority = thread->priority;
		}
		mutexUnlock(&threadQueueMutex);
		schedulerResumeDelayedTasks();
	} else {
		do {
			thread->nextScheduled = threadResumeQueue;
		} while (!__sync_bool_compare_and_swap(&threadResumeQueue, thread->nextScheduled, thread));
	}
}

void schedulerSuspendTask(Thread *thread) {
	mutexLock(&threadQueueMutex);
	if (threadQueue[thread->priority] == thread) {
		threadQueue[thread->priority] = thread->nextScheduled;
		if (threadQueue[thread->priority] == thread) {
			threadQueue[thread->priority] = NULL;
		}
		return;
	}
	thread->nextScheduled->prevScheduled = thread->prevScheduled;
	thread->prevScheduled->nextScheduled = thread->nextScheduled;
	mutexUnlock(&threadQueueMutex);
	schedulerResumeDelayedTasks();
}

Поля структуры Thread nextScheduled и prevScheduled описаны как Thread * volatile.

Условия такие:

1) schedulerFindNextTask может прервать любую функцию, кроме самой себя. Ей допустимо иногда возвращать NULL вместо того, чтобы выполнить свою работу.

2) schedulerResumeTask может прервать любую функцию, в том числе саму себя. Она не имеет права блокироваться.

3) schedulerSuspendTask не может прервать никакую функцию, но два потока могут вызвать эту функцию одновременно. Допустима блокировка выполнения.

Больше никакой код к полям nextScheduled и prevScheduled, а также вышеописанным глобальным переменным не обращается.

1) Достаточно ли проверок и условий в этих функциях, чтобы гарантировать, что структура данных (связанный список) не будет никогда повреждена и не возникнет зависания (schedulerFindNextTask и schedulerResumeTask не имеют права блокироваться ни при каких обстоятельствах).

2) Можно ли доверять встроенной функции GCC __sync_bool_compare_and_swap на платформе ARM Cortex-M3?

3) Возможно ли переписать данный код более оптимально, сохранив безопасность?

★★★★★

Последнее исправление: KivApple (всего исправлений: 5)
Ответ на: комментарий от mashina

Можно конкретные замечания, что в моём стиле кодирования плохого? Если будут объективные замечания, я постараюсь их исправить.

KivApple ★★★★★
() автор топика
Последнее исправление: KivApple (всего исправлений: 1)
Ответ на: комментарий от KivApple

Ты не на том форуме вопрос задал, здесь 70% виндузятников и 90% нифига не программистов. Так что врядли тебе по делу ответят, но вероятность есть конечно

Promusik ★★★★★
()
Ответ на: комментарий от KivApple

А можно конкретные замечания?

код практически не читаем..какие-уж тут замечания :-)

PS. если встречается что-то типа «thread->platformData.prevScheduled->platformData.nextScheduled» то это верный признак что надо переписывать.

MKuznetsov ★★★★★
()
Ответ на: комментарий от MKuznetsov

Претензии только к большой вложенности структур? Иных серьёзных замечаний к читабельности нет?

KivApple ★★★★★
() автор топика
Ответ на: комментарий от KivApple

Можно конкретные замечания, что в моём стиле кодирования плохого? Если будут объективные замечания, я постараюсь их исправить.

Да, пожалуйста... Кругом копипаста со специализацией в разных вариантах, например тут

thread->platformData.nextScheduled->platformData.prevScheduled = thread->platformData.prevScheduled;
thread->platformData.prevScheduled->platformData.nextScheduled = thread->platformData.nextScheduled;
копипастишь всюду thread->platformData и, может быть, везде по коду примитивы работы со связанными списками. Кому нужно вчитываться в беспощадные и однообразные простыни текста и отделять примитивы работы со списками от иной логики, тем более везде похожие операции делаются по-разному. Мышиная возьня должна выноситься в отдельные ф-ии чтобы из названия было понятно что это и чтобы не нужно было каждый раз на неё тратить время. Только из-за одной копипасты получаются строки длиной по 100 символов хотя смысловой нагрузки в них практически никакой нет.

Есть в коде куча переменных с префиксом thread, но нет переменных с другим префиксом - нахрена на ровном месте удлиняешь название сущностей? Читабельность от этого значительно снижается, а с ней и снижается вероятность поиска ошибок. Продолжая тему, совершенно бесcмысленно называть переменную threadMaxRunningPriority так длинно. Если есть проблема с коллизией имён где всё это добро помещается, то нужно помещять их в объекты (структуры) с короткими ёмкими названиями. Если очень хочется уточнить за чем переменная нужна, то сделай это коментом к объявлению переменной (полю структуры), а не придумывая дебильные километровые названия.

mashina ★★★★★
()
Ответ на: комментарий от mashina

Удалил из сообщения platformData. Теперь имена стали короче, а логика осталась прежней.

Си не умеет к классы и namespace. Конечно, я могу запихнуть данные планировщика в какую-нибудь структуру, но какая разница между scheduler.maxRunningPriority и threadMaxRunningPriority? Называть переменные односложно, так что потеряется их смысл, я не хочу. Я не из тех программистов, у которых куча переменных i, j, k, m, n и т. д., а потом какая-то чёрная магия. Однобуквенные имена я использую только в for.

Копипасты работы со спискам нет. Добавление в двусвязанный список осуществляется только в schedulerResumeTask, а удаление только в schedulerSuspendTask. Также в schedulerResumeTask осуществляется добавление в односвязанный список threadResumeQueue, а удаление в schedulerResumeDelayedTasks. В каждом случае свой алгоритм (очевидно, что работа с двусвязаным и односвязаным списком отличается, также как отличаются операции добавления и удаления).

KivApple ★★★★★
() автор топика
Последнее исправление: KivApple (всего исправлений: 2)
Ответ на: комментарий от KivApple

Я не из тех программистов, у которых куча переменных i, j, k, m, n и т. д., а потом какая-то чёрная магия. Однобуквенные имена я использую только в for.

Никто ещё об этом никогда не писал, все по какой-то странной причине обходят эту тему в беседах... Но сейчас я постараюсь перешагнуть через предрассудки и озвучить страшную тайну: кроме идиотских названий длиной в половину терминала и однобуквенных переменных есть ещё имена из 2, 3, 4 и даже из 5 и 6 символов.

какая разница между scheduler.maxRunningPriority и threadMaxRunningPriority?

Наверняка их можно сделать статиками и thread опускать. В общем, если избавиться от графомании, то оно мого бы иметь вид вроде inQueWM

mashina ★★★★★
()
Ответ на: комментарий от KivApple

уменьш вложеность

инвертируй в некоторых местах условия для перестановки веток.

основную ветку старайся поднимать в основной поток -(для этого из неосновных веток можно и досрочный выход из функции)

вообще код понятней когда основное тело предвараяется фильтрацией на предусловие(и обработкой если фильтр тормозит состояние)

qulinxao ★★☆
()
Ответ на: комментарий от qulinxao
#define CAS(var, oldValue, newValue) __sync_bool_compare_and_swap(&(var), oldValue, newValue)

typedef struct Item Item;
struct Item {
	Item * volatile next;
	...
};

Item * volatile head = NULL;

void addToList(Item *item) {
	while (true) {
		item->next = listHead;
		if (item->next == NULL) {
			item->next = item;
			if (CAS(listHead, NULL, item)) {
				break;
			}
		} else {
			if (CAS(listHead, item->next, item)) {
				break;
			}
		}
	}
}

void removeFromList(Item *item) {
	while (true) {
		Item *prev = listHead;
		while (prev->next != item) {
			prev = prev->next;
		}
		if (CAS(prev->next, item, item->next)) {
				if (CAS(head, item, item->next)) {
					CAS(head, item, NULL);
				}
				break;
			}
		}
	}
}

Переписал код более-менее абстрактно, чтобы конкретизировать вопрос. Меня сейчас интересует в первую очередь корректность реализации этих двух функций (без учёта вопроса освобождения памяти после удаления элемента - будем считать, что я могу гарантировать, что элемент никто не добавит перед его уничтожением).

А вот полный код - http://pastebin.com/UqsFYGbV

Так лучше?

KivApple ★★★★★
() автор топика
Последнее исправление: KivApple (всего исправлений: 1)
Ответ на: комментарий от KivApple

Во внутреннем цикле в removeFromList время выполнения O(n), результат непиемлемое WCET.

Уж лучше блокировки и\или запрет прерываний, чем это.

shkolnick-kun ★★★★★
()
Ответ на: комментарий от KivApple

Зачем тебе вообще lockfree планировщик?

Lockfree не значит хорошо на самом деле.

Та же операция CAS вполне себе «блокирующая».

shkolnick-kun ★★★★★
()
Ответ на: комментарий от shkolnick-kun

Ты не прав. Цикл while означает «выполнять, пока не получится сделать всё атомарно». То есть, если нас не прерывали, то случится break на первой же итерации. Если прервали, то придётся кое-что переделать (структура данных при этом не будет разрушена).

Добавление (пробуждение ото сна) задачи в список выполняется за О(1).

Вот с удалением есть проблема, потому что надо найти предыдущий элемент (вложенный while). Там сложность O(N). Но отправлять задачу в сон обработчики прерывания не будут, только обычные процессы.

Кстати, N здесь количество не спящих задач с таким же приоритетом, а не общее количество задач.

Я многое переделал в коде. Теперь последний элемент списка указывает на NULL.

http://pastebin.com/EqG53n4F

А вот упрощённый вариант алгоритма добавления-удаления, если лень разбираться, http://pastebin.com/2TvDf0Sy.

У меня срабатывает assert при тестировании. Не могу понять, где ошибка.

KivApple ★★★★★
() автор топика
Последнее исправление: KivApple (всего исправлений: 3)
Ответ на: комментарий от KivApple

Цикл while означает «выполнять, пока не получится сделать всё атомарно». То есть, если нас не прерывали, то случится break на первой же итерации.

Спин блокировка - вид сбоку.

Вот с удалением есть проблема, потому что надо найти предыдущий элемент (вложенный while). Там сложность O(N). Но отправлять задачу в сон обработчики прерывания не будут, только обычные процессы.

Какая разница, где будет превышено максимальнодопустимое время выполнения?

Конец немного предсказуем.

http://pastebin.com/EqG53n4F

Лучше на гитхаб

shkolnick-kun ★★★★★
()
Ответ на: комментарий от KivApple

У меня срабатывает assert при тестировании. Не могу понять, где ошибка

Ну так до CAS(list.head, item->next, item) в общем случае (item != head), т.к. List.head еще не равно item.

shkolnick-kun ★★★★★
()
Ответ на: комментарий от shkolnick-kun

Я бы сказал не в общем случае, а вообще такого не должно быть, чтобы item == head до присваивания. Ведь голова списка по определению указывает на такой элемент, который в списке есть. А именно последующий CAS является добавлением элемента в список, до этого элемент списку не принадлежит. И assert(item != head) вызывает падение программы лишь при условии, что item == head. То есть возникает невозможная ситуация, которая возможна лишь при разрушении списка.

Спин блокировка - вид сбоку.

Не совсем. Вот смотри. В той же ChibiOS/RT после каждого прерывания запускается перепланирование задач, если что-то изменилось. У меня перепланирование (переход на следующую итерацию цикла while вместо выхода из него) происходит только если случилось прерывание во время планирования. То есть просто к стоимости каждого прерывания добавляется стоимость новой итерации цикла. С учётом того, что цикл маленький, эта добавленная стоимость будет очень мала. То же касается и функций добавления/удаления. К стоимости прерывания добавляется стоимость перезапуска всех функций добавления/удаления, а также планировщика, которые он прервал.

И все эти циклы гарантированно завершаться как только перестанут приходить прерывания. Однако если прерывания приходят так часто, что даже коротенькие функции не успевают дойти до конца, то система явно перегруженна, а в таких условиях никакая RTOS ничего не гарантирует (кроме того, что высокоприоритетные прерывания, не использующие функции RTOS, будут нормально работать, но у меня тоже так).

KivApple ★★★★★
() автор топика
Последнее исправление: KivApple (всего исправлений: 1)
Ответ на: комментарий от KivApple

Если падает, значит есть две конкурентные попытки вставить один и тот же элемент в один и тот же список.

Значит алгоритм кривой, раз такое можно сотворить.

Пишите, Шура, пишите... (С)

anonymous
()
Ответ на: комментарий от anonymous

А вот и не угадал. Сделал такую штуку. Добавление элемента - lock-free. Перед удалением хватаю mutex, а после удаления отпускаю. То есть удаление не может прерывать удаление, но может прерывать добавление, а добавление может прерывать и само себя и удаление.

И всё работает. А если сделать наоборот (lock-free удаление и добавление с mutex), то ошибка не исчезает. Значит ошибка таки в функции удаления.

KivApple ★★★★★
() автор топика
Ответ на: комментарий от KivApple

В той же ChibiOS/RT после каждого прерывания запускается перепланирование задач, если что-то изменилось.

Это правильно.

А вот и не угадал. Сделал такую штуку. Добавление элемента - lock-free. Перед удалением хватаю mutex, а после удаления отпускаю. То есть удаление не может прерывать удаление, но может прерывать добавление, а добавление может прерывать и само себя и удаление.

А вот тут фейл. Надо брать отладчик и выяснять «что пошло не так», а ты пытаешься методом тыка определить «как правильно».

И да, у тебя во время траверса этого твоего стека не захватываются блокировки элементов, что может привести к повреждению этого самого стека. С другой стороны, если ты будешь их захватывать, может случиться дэдлок.

Вот поэтому то и нужна блокировка всего стека.

shkolnick-kun ★★★★★
()
Ответ на: комментарий от shkolnick-kun

Я смог таки запилить lock-free планировщик - http://pastebin.com/HirFnhMb. Правда он немного странный. Я использую ленивое усыпление нитей. То есть усыпление по факту приводит лишь к выставлению спец. флага. А планировщик при попытке переключиться на процесс с таким флагом удаляет его из очереди и идёт к следующему. Если нить пробудят до того, как до неё дойдёт очередь, то никаких операций со списком не потребуется.

Прогнал некоторые тесты. Вот результаты (всё время в наносекундах, график построен по первому блоку тестов):

https://pp.vk.me/c627616/v627616975/650cd/ofCdRyvTK-o.jpg

Процессор - Core i5-4210U (4 ядра). 1 поток (настоящий, линуксовый) 10 миллионов раз вызывает schedulerNextThread. 3 потока (тоже настоящих) в бесконечном цикле (точнее пока не закончит выполнение главный поток) генерируют 2 случайных числа. Первое - номер потока (не настоящего, а моего), второе - действие (отправить в сон или наоборот пробудить). И выполняют это действие с указанным потоком, замеряя время выполнения действия. При этом если поток уже был в нужном состоянии или заблокирован другой операцией (если кто-то уже начал добавлять/удалять поток, то функция выходит ни с чем - кто первый начал, тот и прав), то её время выполнения не учитывается в общей статистике. Ах да, процесс запускаю от root и с nice -19.

Можно заметить, что время исполнения Suspend и Resume практически не зависит от количества потоков, а время поиска нового потока зависит примерно линейно. Впрочем, это так лишь в худшем случае, когда планировщику приходится за раз усыплять кучу потоков, прежде чем он найдёт бодрствующий.

На мой взгляд условия теста более жёсткие, чем бывает на микроконтроллерах в реальных задачах, так что для грубой оценки сверху вполне подходят (весьма сомнительно, что в штатных условиях работы, может быть результат хуже). А ещё ведь микроконтроллер одноядерный и уровень вложенности вызовов функций suspend/resume весьма ограничен, так что потери на CAS должны быть значительно меньше. Resume должен выполняться быстрее, чем FindNext.

Получается, что среднее время работы планировщика на Core i5 - 50 наносекунд (вряд ли кто-то будет запускать на МК сотни процессов, да ещё и непрерывно будить и усыплять их, спящие потоки ресурсов кроме памяти не жрут, ибо удалены из очереди). Есть идеи, как можно очень грубо перевести это в ситуацию с микроконтроллером? Во сколько раз мой Core i5 круче какого-нибудь STM32F103?

P.S.: Проверял корректность работы планировщика на большем числе итераций (вплоть до 10 миллиардов) с различными уровнями оптимизации (O0, O2 и O3). Ни при каких обстоятельствах не происходит разрушения данных или зависания. А функция замера времени отбрасывает оверхед от вызова функций получения timestamp.

KivApple ★★★★★
() автор топика
Последнее исправление: KivApple (всего исправлений: 2)
Ответ на: комментарий от KivApple

А планировщик при попытке переключиться на процесс с таким флагом удаляет его из очереди и идёт к следующему.

Подобную технику использовали в L4, она приводит к опасности отказа в обслуживании, поэтому в seL4 от нее отказались.

Если работает - пока не трогай. Но лучше все таки найти замену.

shkolnick-kun ★★★★★
()
Ответ на: комментарий от KivApple

в целом, shkolnick-kun правильные замечания высказал насчёт RT. даже добавить нечего, пожалуй.
что касается процессора, то Core-i5 ты используешь не напрямую. а на мелкоконтроллере ты вроде как собираешься выполнять код прямо на железяке, если я правильно понимаю. железяка намного шустрее любой оси, но на мелкоконтроллере не будет таких плюшек, которые предоставляют мощные процы. так что сравнить «насколько круче» вряд ли возможно. это разные процы под разными системами.
однако, на контроллерах начинаются игры с обработкой прерываний. там другие проблемы. там таймеры, тики, надо успевать всё обрабатывать. там никто не запускает стопицот процессов, зато есть всякая периферия, которая требует обработки.

Iron_Bug ★★★★★
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.