LINUX.ORG.RU

Многопоточность Linux. Mutex'ы


0

2

Доброго всем здоровья!)

Для предметности: есть 101 работающий поток. Из них 100 потоков «что-то» делают, а результат деятельности пишут в одну и ту же переменную. Оставшийся 1 поток («главный поток») следит за тем, чтобы после завершения любого из потоков создавался новый и сам тоже пишет данные в общую переменную. Для правильной борьбы за право писать в переменную установим мьютекс для общей переменной.
Если «главный поток» запросит блокировку(мьютекс) общей переменной (и будет блокирован до освобождения переменной), то совсем неизвестно, сколько он прождет своей очереди, а тем временем количество потоков будет падать.
Подскажите пожалуйста как или чем организовать «очередь на использование переменной» для 100 потоков и как дать право «главному потоку» получать доступ без ожидания своей очереди? Ведь мьютексы никак не определяют очередность(
Спасибо!

Очевидное решение - использовать два вложенны mutex'а

int varl
mutex m1;
mutex m2;

void lock() {
   lock(m1);
}

void unlock() {
   unlock(m1);
}

void lock_child() {
   lock(m2);
   lock(m1);
}

void unlock_child() {
   unlock(m1);
   unlock(m2);
}

Дочерние потоки используют для блокировки lock_child/unlock_child; а основной процесс использует lock/unlock. В результате дочерние процессы будут блокироваться на m2; и размер очереди на m1 не превысит 1, поэтому основной поток встанет первым блокировку на блокировку m1.

Nastishka ★★★★★
()

101 поток - это что-то жуткое даже для восьмиядреного нехалема, хотя сам работал с таким приложением :)

А нельзя разделить главный поток на два: (1) тот, кто следит за воскрешением; (2) тот, кто пишет в переменную? Вообще, нельзя ли сделать один поток специально для того, чтобы он брал задания из своей очереди и писал экслюзивно в переменную, а остальные потоки асинхронно посылали бы ему задания? У меня такая чувство, что если делать так, как у тебя описано, то блокировка главного потока может быть неконтролируемо долгой...

dave ★★★★★
()
Ответ на: комментарий от Nastishka

А если после дочернего потока обе блокировка возьмет другой дочерний, и главный поток опять останется заблокированным?

dave ★★★★★
()
Ответ на: комментарий от Nastishka

Спасибо! Я думал об этом)
Т.е.: дочерний поток держит 2 мьютекса (m1 и m2). В это время «главный поток» находится в состоянии блокирования и ждет своего m1. После завершения работы дочерний поток
1) Освобождает m1
2) Освобождает m2
Следующий дочерний поток
3) Захватывает m2
4) Пытается захватить m1

Разве время, которое прошло от 1) до 3) гарантирует нам, что «главный поток» захватит m1?

nitroxolyne
() автор топика
Ответ на: комментарий от dave

Согласен с товарищем dave. Время на освобождение ресурса дочерним потоком не дает гарантий захвата ресурса «главным потоком»

nitroxolyne
() автор топика
Ответ на: комментарий от dave

> А если после дочернего потока обе блокировка возьмет другой дочерний

Не возьмет. Родительский поток 1, дочерние 2,3,4

Дочерний 2 захватил m2 и m1:
m1[2]
m2[2]

Дочерний 3 застревает на m2:
m1[2]
m2[2]3

Родительский пытается захватить m1:
m1[2]1
m2[2]3

Дочерний 4 застревает на m2:
m1[2]1
m2[2]34

Дочерний 2 освобождает m1,m2, очереди сдвигаются:
m1[1]3
m2[3]4

Если не изобретать велосипедов и использовать для ожидания mutex'а блокирующий вызов, то так оно и будет.

Nastishka ★★★★★
()
Ответ на: комментарий от Nastishka

Я небольшой специалист, но если на практике за то время пока
Дочерний 2 освобождает m1,m2 и дочерний 3 захватывает m2 и «пытается» захватитть m1, главный успевает захватить m1 -РАБОТАЕТ, то ничего против и БОЛЬШОЕ ВСЕМ СПАСИБО ЗА ПОМОЩЬ!

nitroxolyne
() автор топика
Ответ на: комментарий от Nastishka

Непереносимо, ведь нет никаких гарантий на счет того, как устроена очередь ожидания.

Я тут подумал. В принципе можно сделать так, чтобы главный поток получал приоритет в выполнении. Для этого надо использовать условия (как минимум, два) и две вложенных блокировки (внешняя/внутренняя). Внутри внешней переводить поток в ожидание и уведомлять другие потоки в случае необходимости. Две разных версии для главного и дочернего потоков. Только геморра много, но написать, думаю, можно. Еще надо проверить на дедлоки. Но это неправильный путь. На мой взгляд лучше бы выделить отдельный поток, который бы только менял переменную. Упростить архитектуру.

dave ★★★★★
()
Ответ на: комментарий от dave

> лучше бы выделить отдельный поток, который бы только менял переменную. Упростить архитектуру.

Не всегда возможно. Существует очень ненулевая вероятность того, что эта самая «переменная» использутся для расчета значения, передаваемого в новосоздаваемую нить - например «переменная» это соединение с базой данных, при завершении потока в базу сбрасывается результат расчета, перед стартом нового потока из базы берется ID для нового результата - тогда вся идея с асинхронной работой с «переменной» рассыпается как карточный домик.

Nastishka ★★★★★
()
Ответ на: комментарий от nitroxolyne

Еще один вопрос поводу очереди на право записи:

Если есть 100 потоков и 1 ресурс. Нужно, чтобы потоки получали право на блокировку ресурса(на запись) по очереди. Конечно, можно описать эту ситуацию с помощью алгоритма «с двумя мьютексами» как описано выше(здесь их будет 100), но есть ли другой способ?

nitroxolyne
() автор топика
Ответ на: комментарий от dave

> по-моему гарантий нет

Вообще говоря, в Win32 схема именно такая, а в pthread есть множество всяческих SCHED_FIFO и SCHED_RR, которые позволят если не реализовать очередь, то как минимум объявить главный поток «более приоритетным».

Nastishka ★★★★★
()
Ответ на: комментарий от dave

> Еще надо проверить на дедлоки

Не надо. Достаточно просто блокировать строго в определенном порядке и разблокировать строго в обратном :-)

Nastishka ★★★★★
()
Ответ на: комментарий от nitroxolyne

Предполагаю, что дочерний, после освобождения m1 должен передать управление планировщику CPU.

anonymous
()
Ответ на: комментарий от anonymous

чем не годна такая схема?



char volatile chlock=0;  //--- глобальная ---

    //--- child ---

    lock(m2);        //--- вошли во внутренний двор (по одному) ---
    if(chlock != 0){ //--- а не заявил ли о себе главный? ---
        lock(m3);    //--- если да, то ждем освобождения ---
        unlock(m3);
    }
    lock(m1);
    //--- ... ---
    unlock(m1);
    unlock(m2);

    //--- parent ---

    lock(m3);         //--- заряжаем мутекс для блокировки дочернего ---
    сhlock = 1;       //--- заявляем о себе ---
    lock(m1);
    //--- ... ---
    unlock(m1);
    сhlock = 0;
    unlock(m3);


oleg_2
()

вы не поверите, но это делается всего двумя мютексами. обращения типа try_lock идут посредством арбитра, который перед тем как заблокировать ресурс проверяет что пришло время сделать это

anonymous
()
Ответ на: комментарий от oleg_2

> чем не годна такая схема?

линус наш торвальдс тоже считал что года, а в том, что она не работает, виноваты разработчики gcc

anonymous
()
Ответ на: комментарий от anonymous

вы не поверите, но это делается всего двумя мютексами. обращения типа

try_lock идут посредством арбитра, который перед тем как заблокировать

ресурс проверяет что пришло время сделать это


Можете пояснить, что за арбитр?

nitroxolyne
() автор топика
Ответ на: комментарий от mv

А можно подробнее о lockless queue?

А можно подробнее о lockless queue? Чето ниче я толком по этим ключевым словам не нагуглил(

nitroxolyne
() автор топика
Ответ на: А можно подробнее о lockless queue? от nitroxolyne

Товарищ mv, скорее всего имел ввиду heavylockless queue. Для обеспечения потокобезопасности все равно будет какой нибудь способ синхронизации или атомарности, например спинлоки или CAS.

baverman ★★★
()
Ответ на: А можно подробнее о lockless queue? от nitroxolyne

В любом случае, ты не должен хотеть сто потоков залоченных на разделяемую переменную. Ни за что не поверю что твоя задача не решается парой очередей и пулом воркеров. Не надо городить сложную для понимания и тестирования хуергу.

baverman ★★★
()
Ответ на: комментарий от baverman

В том то и дело, что не хочу городить ничего сложного для понимания. Разделил общую переменную между «главным потоком» и «дочерними потоками» самым очевидным (способом, о котором выше писала Nastishka). Это позволяет приоритетно получить блокировку главного потока, но к сожалению, не решает проблему очереди к ресурсу между дочерними потоками. Было бы идеально, если бы для общей переменной была своя очередь. Каждый, поток, который претендует на ее захват, должен «добавить себя» (например, ID потока и приоритет ) в очередь и ждать, пока не «увидит себя» в голове очереди. «Увидев себя в голове», выполняет критич секцию и «извлекает себя» из очереди. И если бы операции добавления в очередь и извлечения были атомарными.... Бы.. бы.. бы.. бы.. бы.. бы.. бы.. бы.. бы..))))))))))

nitroxolyne
() автор топика

Советую посмотреть у Стивенса (UNIX: взаимодействие процессов), там для вашего случая предложено несколько готовых схем. Одна из них - использование RW/RD мьютексов, т.е. создание двух мьютексов, один из которых используется для write-only блокировки, а второй - для read-only. Кроме того, можно использовать условные переменные и сигнализировать о разблокировании ваших критических данных. Есть и множество других способов...

Eddy_Em ☆☆☆☆☆
()
Ответ на: комментарий от nitroxolyne

В том то и дело, что не хочу городить ничего сложного для понимания

Для твоего понимания. Такое ощущение, что в качестве инструмента для синхронизации ты знаешь только мьютекс и теперь пытаешься его пихнуть во все щели.

Я не знаю есть ли для сишников такая же хорошая вещь, как Java Concurrency in Practice, но в том, что ты нуждаешься в базовых знаниях, сомнений никаких нет. Спроси у лора лучше книги, а не как расшарить бедную переменную между сотней потоков.

baverman ★★★
()
Ответ на: комментарий от baverman

Я и говорю: Стивенса почитать надо обязательно. Может, придет в голову что-нибудь интересное...

Eddy_Em ☆☆☆☆☆
()
Ответ на: А можно подробнее о lockless queue? от nitroxolyne

А можно подробнее о lockless queue? Чето ниче я толком по этим ключевым словам не нагуглил(

Как-то так...

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <assert.h>

void *swap(void *ptr, void *new)
{
    void *old;
    asm volatile (
        "xchg %%rsi, (%%rdi)\n"
        : "=S"(old)
        : "D"(ptr), "S"(new)
        : "memory"
        );
    return old;
}

typedef struct {
    void *payload;
    void *next;
} qmem_t;

qmem_t head;
qmem_t *queue;

qmem_t *qmem_new()
{
    qmem_t *ptr = calloc(sizeof(qmem_t), 1);
    assert(ptr);
    return ptr;
}

void qmem_push(qmem_t **prev, qmem_t *ptr)
{
    qmem_t *old = swap(prev, ptr);
    if (old)
        old->next = ptr;
}

volatile int count;

void *foo(void *arg)
{
    int i;
    char buf[256];
    snprintf(buf, sizeof(buf), "hello from thread %d", (int)arg);
    for (i = 0; i < 100000; i++) {
        qmem_t *new = qmem_new();
        new->payload = buf;
        qmem_push(&queue, new);
        usleep(10000);
    }
    count--;
}

#define N 100

int main()
{
    int i;
    pthread_t thr[N];

    qmem_push(&queue, &head);

    count = N;
    for (i = 0; i < N; i++)
        pthread_create(&thr[N], NULL, foo, (void *)i);

    do {
        qmem_t *ptr = head.next;

        while (ptr) {
            qmem_t *tmp = ptr;
            if (!ptr->next)
                head.next = ptr;
            else {
                printf("%s\n", ptr->payload, ptr->next);
                free(ptr);
            }
            ptr = tmp->next;
        }
    } while (count);

}

mv ★★★★★
()
Ответ на: комментарий от Eddy_Em

А у вас там разве не может случиться, что несколько потоков будут одновременно менять указатели?

Нет.

mv ★★★★★
()
Ответ на: комментарий от mv

Т.е. вы полагаете, что функция swap будет выполняться атомарно и в единственном экземпляре (в многопроцессорной или многоядерной системе)?

Eddy_Em ☆☆☆☆☆
()
Ответ на: комментарий от mv

>> Все эти якобы «lockless» алгоритмы - профанация.

Почему?

Потому что в лучшем случае это аппаратно реализованная блокировка.

tailgunner ★★★★★
()
Ответ на: комментарий от tailgunner

Потому что в лучшем случае это аппаратно реализованная блокировка.

Ну и что? В реальной жизни для толстяка в одну дверь одновременно тоже не проходят, но никто ж не умирает от этого :)

mv ★★★★★
()
Ответ на: комментарий от Eddy_Em

Т.е. вы полагаете, что функция swap будет выполняться атомарно и в единственном экземпляре (в многопроцессорной или многоядерной системе)?

Атомарно, в любом количестве экземпляров.

mv ★★★★★
()
Ответ на: комментарий от mv

Просто я в ассемблере не силен. Получается, ваш ассемблерный код приостанавливает выполнение всех потоков, пока меняет местами указатели?

Eddy_Em ☆☆☆☆☆
()
Ответ на: комментарий от Eddy_Em

Просто я в ассемблере не силен. Получается, ваш ассемблерный код приостанавливает выполнение всех потоков, пока меняет местами указатели?

Типа того. Атомарные команды типа xchg блокируют шину на время своего выполнения, остальные потребители ждут разблокировки.

mv ★★★★★
()
Ответ на: комментарий от mv

>> Потому что в лучшем случае это аппаратно реализованная блокировка.

Ну и что?

Ну и какой он после этого «lockless?»

tailgunner ★★★★★
()
Ответ на: комментарий от mv

Понятно. Но, все-таки, с мьютексами или семафорами код проще воспринимается. Хотя, конечно, в вашем случае нет шансов «подвисшей блокировки».

Eddy_Em ☆☆☆☆☆
()
Ответ на: комментарий от tailgunner

Ну и какой он после этого «lockless?»

Таким макаром можно и программиста электриком назвать =) Ну а чё? Коммутирует электрические линии же...

mv ★★★★★
()
Ответ на: комментарий от Eddy_Em

Понятно. Но, все-таки, с мьютексами или семафорами код проще воспринимается. Хотя, конечно, в вашем случае нет шансов «подвисшей блокировки».

Ещё и сильно быстрее. А непонятный код можно обернуть в привычный queue.

mv ★★★★★
()
Ответ на: комментарий от mv

> Таким макаром можно и программиста электриком назвать =)

Ну да. Если называть вещи тем, чем они не являются, то можно и луну назвать зеленым сыром.

tailgunner ★★★★★
()
Ответ на: комментарий от tailgunner

(капризно) Нет, пусть она будет hardware-locked %)

-and-unlocked! Чтобы не вызывать сомнения.

mv ★★★★★
()
Ответ на: комментарий от mv

не, я согласен с tailgunner. нужно избегать, по возможности,
таких вот «lockless» решений. реальный профит получить трудно,
а вот гемора - легко.

вот даже этот тривиальный код не сразу понятен. и, похоже,
не без проблем.

do {

qmem_t *ptr = head.next;




while (ptr) {


qmem_t *tmp = ptr;


if (!ptr->next)


head.next = ptr;



мы «теряем» последний элемент. он будет виден только после
следующего qmem_push().

нехорошо, а исправить нелегко. если мы не будем его пропускать,
qmem_push() становится racy.

кроме того, в теории нам нужен read_barrier_depends(ptr), но
это ладно, экзотика.

idle ★★★★★
()
Ответ на: комментарий от idle

не, я согласен с tailgunner. нужно избегать, по возможности,

таких вот «lockless» решений. реальный профит получить трудно, а вот гемора - легко.

А какой реальный профит от мьютекса, нет, двух вложенных мьютексов? Геморра от них ну вот точно больше, чем пользы. Можно ещё priority мьютекс вкорячить, чтобы 101-й тред читать нормально мог, вот где весело будет! ;)

мы «теряем» последний элемент. он будет виден только после

следующего qmem_push().

Я ждал, если кто заметит :) Там ещё один баг есть, который проявится, если начать память активно выделять/освобождать. Си - зло, что говорить?..

нехорошо, а исправить нелегко. если мы не будем его пропускать,

qmem_push() становится racy.

Да ладно...

--- 123.c    2010-09-18 09:29:34.270001448 +0200
+++ 123.c    2010-09-18 09:29:43.150001236 +0200
@@ -2,6 +2,7 @@
 #include <stdlib.h> 
 #include <pthread.h> 
 #include <assert.h> 
+#include <string.h>
  
 void *swap(void *ptr, void *new) 
 { 
@@ -46,7 +47,8 @@ void *foo(void *arg)
     snprintf(buf, sizeof(buf), "hello from thread %d", (int)arg); 
     for (i = 0; i < 100000; i++) { 
         qmem_t *new = qmem_new(); 
-        new->payload = buf; 
+        new->payload = strdup(buf);
+        assert(new->payload);
         qmem_push(&queue, new); 
         usleep(10000); 
     } 
@@ -71,14 +73,18 @@ int main()
  
         while (ptr) { 
             qmem_t *tmp = ptr; 
-            if (!ptr->next) 
-                head.next = ptr; 
-            else { 
-                printf("%s\n", ptr->payload, ptr->next); 
-                free(ptr); 
+            ptr = ptr->next;
+
+            if (tmp->payload) {
+                printf("%s\n", tmp->payload);
+                free(tmp->payload);
+                tmp->payload = 0;
             } 
-            ptr = tmp->next; 
+
+            if (!ptr)
+                head.next = tmp;
+            else
+                free(tmp);
         } 
     } while (count); 
- 
 }

Два косяка, один из которых не виден, для сишной программы, работающей с потоками и памятью, да наваянной утром во время завтрака - это отличный результат! :)

mv ★★★★★
()
Ответ на: комментарий от mv

> А какой реальный профит от мьютекса, нет, двух вложенных

мьютексов? Геморра от них ну вот точно больше, чем пользы.


почему два-то? главный профит - код станет проще. что, как
ты прекрасно знаешь, особенно важно для долгоживущего кода.

будет ли он медленнее? а вот это нужно доказывать на практике,
и только потом оптимизировать узкие места.

вот смотри. у тебя consumer просто тупо жжет CPU ожидая
!NULL. я все понимаю, этот код демонстрационный. на практике
нам понадобится sleep/wakeup, контроль за переполнением
(если consumer не успевает), еще что-то. сомнитительный
выигрыш от xchg() затеряется в шуме, и нам, скорее всего,
все равно понадобится какой-то locking, в одну из критических
секций можно будет заодно всунуть и добавление элемента.

но настаивать не буду. по себе знаю, всегда хочется поискать
себе проблем ;)

я просто считаю, что начинать с таких вот оптимизаций не
следует (как правило), и уж точно не посоветовал бы это ТС.

Си - зло, что говорить?..


что я слышу? ты, наконец, готов признать, что perl рулит???

Да ладно...


кхм... не уверен, что я понимаю, как выглядит код в результате...
но, вроде да, последний элемент у нас провисает все равно,
но ->payload мы обрабытываем. и, оставаясь демонстрационным,
код стал еще нетривиальнее ;)

а теперь посмотрим на другой, менее очевидный, недостаток такого
подхода. вот у нас 100 producers, каждый старается впихнуть новый
элемент. мы ведь предполагаем contention, иначе зачем вообще
оптимизировать?

представь, что 1-ый thread исчерпал свой time slice в qmem_push()
прямо перед «old->next = ptr;». теперь остальные потоки могут
делать push() до посинения, но результат не будет виден пока
1-ый не получит CPU назад. я, разумеется, не утверждаю, что это
_всегда_ неприемлемо (иногда так даже и лучше), но это нужно
учитывать.

в случае же семафора другой (другие) producer не сможет устроить
такое подобие livelock.

на итог: все что я хотел бы сказать - семь раз отмерь ;)

idle ★★★★★
()
Ответ на: комментарий от idle

вот смотри. у тебя consumer просто тупо жжет CPU ожидая

!NULL.

Нет. Читает до конца (на момент чтения), и всё. Это не очередь, а, скорее, механизм, лежащий между push и pop.

я все понимаю, этот код демонстрационный. на практике

нам понадобится sleep/wakeup,

У тебя проф.деформация, это тут точно не нужно =)

контроль за переполнением (если consumer не успевает),

И что? Обычная очередь также взорвётся, если кладут больше, чем берут.

еще что-то. сомнитительный выигрыш от xchg() затеряется в шуме, и нам, скорее всего, все равно понадобится какой-то locking, в одну из критических секций можно будет заодно всунуть и добавление элемента.

Я не согласен. В жизни проблем от локов не меньше, чем от рейсов. В случае с топикстартером мьютекс явно проблему не решит, плюс добавит новую (консьюмер будет голодать в борьбе за мьютекс). Мой код проблему на концептуальном уровне решает, плюс, если в суть проблемы вникать, он достаточно очевидный.

я просто считаю, что начинать с таких вот оптимизаций не

следует (как правило), и уж точно не посоветовал бы это ТС.

Talk is cheap. Show me the code. (c) Torvalds

Реши проблему ТС на локах, покажи код.

что я слышу? ты, наконец, готов признать, что perl рулит???

Между си и перлом, если выбирать, на чём написать лисп, чтобы сделать основную работу, я, наверное, выберу перл ;) Хотя, если можно использовать бизон...

кхм... не уверен, что я понимаю, как выглядит код в результате...

но, вроде да, последний элемент у нас провисает все равно, но ->payload мы обрабытываем. и, оставаясь демонстрационным, код стал еще нетривиальнее ;)

Ты, наверное, не заглядывал в исходники плюсовых контейнеров, и не знаешь, как страшно может выглядеть очередь =)

представь, что 1-ый thread исчерпал свой time slice в qmem_push()

прямо перед «old->next = ptr;». теперь остальные потоки могут делать push() до посинения, но результат не будет виден пока 1-ый не получит CPU назад. я, разумеется, не утверждаю, что это _всегда_ неприемлемо (иногда так даже и лучше), но это нужно учитывать.

Представь, что в лочном решении тред тоже заснул в push. Всё стало гораздо хуже: остальные треды висят на локе, пока этот засранец не вернёт его.

Жду твой код :)

mv ★★★★★
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.