LINUX.ORG.RU

Потоки и мутексы в Си

 ,


0

1

Нуждаюсь в ликбезе )

Раньше как то приходилось использовать только в С++ для контроля изменении в объекте класса, а что то в процедурном варианте не соображу как безопасно?

Есть 2 потока декодирования и кодирования, в неком участке кода декодирования мне нужно скопировать память для потока кодирования, понятно что это место нужно защитить мьютексом и хотелось бы это сделать в потоке кодирования, однако как правильно обрабатывать

  1. переменную int состояния ошибок в потоках - нужно ли 2 таких переменных (ошибка в любом потоке должна останавливать оба), можно ли писать и читать из разных потоков одновременно без мутексов когда их 2 (каждый либо пишет, либо читает), чтобы не тормозить выполнение?

  2. потоки должны ждать друг друга - после декодирования когда скопируется память, после кодирования когда будут готовы новые данные для копирования - получается еще 2 переменных!? каждую тоже нужно защищать мутексом? или как в п.1 раз один поток только пишет, а другой только читает можно без них обойтись?

или вообще какой то спецфический тип мутекса нужен!?

ПС. а то у меня получается 3 или 5 мутексов мне нужно чтоб каждую операцию обрабатывать.

★★★

Последнее исправление: wolverin (всего исправлений: 1)

Ответ на: комментарий от yoghurt

очереди для копируемой памяти? я думал об этом, но не смотря на выигрыш во время заполнения очереди, я все равно приду к моменту полного заполнения очереди, когда один поток тормозит другой в силу того, что пока у меня поток декодирования самый медленный

wolverin ★★★
() автор топика

По описанию задачи вроде можно выкинуть потоки и сделать просто поочередное выполнение. И ничего копировать не надо. Ладно бы там сразу 100500 потоков декодилось, и ещё десяток собирал что там надекодилось и по меде руступления тоже молотил по своему. Но тут же просто 2 потока дрыгают одно и тоже по очереди.

LINUX-ORG-RU ★★★★★
()
Последнее исправление: LINUX-ORG-RU (всего исправлений: 1)
Ответ на: комментарий от LINUX-ORG-RU

можно конечно, так и сделано сейчас, но мне нужно сделать быстрее хотя бы на 30-40%, декодирование занимает 70-80 мс, а кодирование 50 мс за один блок (вообще речь про видео кадры)

wolverin ★★★
() автор топика
Последнее исправление: wolverin (всего исправлений: 1)
Ответ на: комментарий от LINUX-ORG-RU

не, ждет только копирование памяти

wolverin ★★★
() автор топика

У тебя голая сишка? Возьми libdispatch от яблок и посмотри как у них решается типичная проблема producer - consumer. Всё будет довольно легко, там либа на очередях основана и без мьютексов ты сможешь решить свою задачу. Рекомендую использовать с ней Clang

menangen ★★★★★
()

Кто от кого зависит — энкодер от декодера или наоборот? Куда копируется результат после окончания работы обоих тредов?

deep-purple ★★★★★
()

У тебя очень непонятное описание. Мне кажется у тебя какое-то странное представление о мютексах.

firkax ★★★★★
()
Ответ на: комментарий от wolverin

mutex будет блокировать тебе всё. На то он и mutex. Если декодирование занимает 80 ms то всё это время будет ждать кодировщик если захочет сделать pthread_mutex_lock(&mutex); перед обращением к данным. И когда он это сделает если декодировщик снова захочет передать ему данные и тоже сделает pthread_mutex_lock(&mutex); то он будет ждать, так и будут они ждать друг дружку как только будут притрагиваться к mutex дрыгание которого означает для тебя что несколько параллельных штук должны обращаться к чему то одному по очереди, а значит они будут ждать. кодировщик не получит данные от декодировщика раньше так как он ждёт pthread_mutex_unlock(&mutex); от него.

мутексы создают последовательную работу в паралельных процессах при доступе к общему ресурсу. Как то так

не компилял, писал в слепую псевдокод

#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <pthread.h>

enum
{
    HAPPY,
    ENCODER_ERROR,
    DECODER_ERROR,
    DECODER,
    ENCODER,
    OBSERVER,
};

typedef struct {
    uint8_t * data;
    size_t    size;
    struct
    {
        uint8_t owner;
        uint8_t status;
    }metadata;
}shared_memory_t;

static shared_memory_t shared_memory = {NULL,0,OBSERVER,HAPPY};

pthread_mutex_t mutex;

void observer(void)
{
    pthread_mutex_lock(&mutex);

    switch(shared_memory.metadata.status)
    {
        case HAPPY:
        {
            pthread_mutex_unlock(&mutex);
        }break;

        default: switch(shared_memory.metadata.owner)
        {
            case DECODER:
            {
                fprintf(stderr,"DECODER ERROR code %d",shared_memory.metadata.status);
                //делаешь что-то
                exit(1);
            }break;

            case ENCODER:
            {
                fprintf(stderr,"ENCODER ERROR code %d",shared_memory.metadata.status);
                //делаешь что-то
                exit(2);
            }break;
        }
    }
}

void * decoder(void * args)
{
    while(1)
    {
    //делает что-то
    //теперь хочет память скопировать для encoder 
    pthread_mutex_lock(&mutex);
    if(shared_memory.data == NULL && shared_memory.size == 0 &&
       shared_memory.metadata.owner  == OBSERVER && 
       shared_memory.metadata.status == HAPPY)
    {
    shared_memory.data = malloc(100500);
    shared_memory.size = 100500;
    //копирует данные в эту память для encoder
    shared_memory.metadata.owner  = DECODER;
    shared_memory.metadata.status = HAPPY;
    }else{
        shared_memory.metadata.owner  = DECODER;
        shared_memory.metadata.status = DECODER_ERROR;
    }
    observer();
    //дальше что-то делает
    }

}

void * encoder(void * args)
{
    while(1)
    {
    //вдруг хочет шареных данных
    pthread_mutex_lock(&mutex);
    if(shared_memory.size !=0 && shared_memory.data != NULL &&
       shared_memory.metadata.owner  == OBSERVER            && 
       shared_memory.metadata.status == HAPPY)
    {
        //обрабатывает данные после decoder и потом
        free(shared_memory.data);
        shared_memory.size = 0;
        shared_memory.metadata.owner  = ENCODER;
        shared_memory.metadata.status = HAPPY;
    }else{
        shared_memory.metadata.owner  = ENCODER;
        shared_memory.metadata.status = ENCODER_ERROR;
    }
    observer(); 
    //дальше что-то делает
    }
}



int main(int argc, char *argv[])
{
    pthread_t threads[2];
    pthread_mutex_init(&mutex, NULL); 
    pthread_create(&threads[0], NULL, encoder , NULL);
    pthread_create(&threads[1], NULL, decoder , NULL);
    pthread_mutex_destroy(&mutex);

    pthread_join(threads[0], NULL);
    pthread_join(threads[1], NULL);

    // ну короче ты понял
    return 0;
}

В самом начале тебе предложили очереди. Суть в том что ты выделяешь кусочки памяти в декодере копируешь туда данные и отправляешь в очередь (массив) а енкодер по мере поступления от туда эти кусочки берёт и делает что-то с ними.

В виде очереди может быть стек,пулл,кольцевой буфер или типа того. Таким образом получится неблокируемая работа всего, сделал pthread_mutex_lock(&mutex); передал в стек указатель на кусок памяти сделал сразу pthread_mutex_unlock(&mutex); и поехал дальше, другая нить захочет данные просто сделает pthread_mutex_lock(&mutex);data=pop() pthread_mutex_unlock(&mutex); и всё. Только надо не LIFO а FIFO стек.

Но если хочешь именно доступ к 1 кусочку памяти и шарить его то либо как сказал выкинуть треды нафиг и делать последовательно или вешать 1 лок на всё и опять делать последовательно. Помойму иначе никак.

ПЫСЫ я не программист может и не прав, но имхо прав =)

LINUX-ORG-RU ★★★★★
()
Последнее исправление: LINUX-ORG-RU (всего исправлений: 2)

Как уже правильно сказали, то что тебе нужно называется очередь Или вообще канал (pipe). Ты можешь прогонять через него ссылки на буферы или вообще данные (через канал). При работе через канал у тебя вообще всё будет просто - потоки работать параллельно и вся связка упрется в скорость самого медленного из потоков (ну при правильном протоколе обмена, само собой).

no-dashi-v2 ★★★
()
Последнее исправление: no-dashi-v2 (всего исправлений: 1)
Ответ на: комментарий от LINUX-ORG-RU

Суть в том что ты выделяешь кусочки памяти в декодере копируешь туда данные и отправляешь в очередь (массив) а енкодер по мере поступления от туда эти кусочки берёт и делает что-то с ними.

межтредовая очередь это полный аналог ленточного транспортера… N рабочих ставят свой ящик на транспортер и бегут за новым , и M рабочих берут ящики с транспортера и ставят например их на полки на складе.

схема называется поставщики/получатели.

если «ящики»(блоки данных) большие - они не копируются, а в очередь кладется указатель на блок.

для пущей скоросоти заранее берется X буферов(ящиков) и делаются две очереди. одна - «полных ящиков» передаваемых потребителям, и вторая - пустых ящиков - возвращаемых от потребителей к поставщикам.

поставщик:

  • берет из очереди пустых ящиков пустой(становится его монопольным владельцем, ибо только у него есть на него указатель), заполняет его данными и кладет в очередь «полных ящиков».

потребитель делает все наоборот:

  • берет из очереди полных ящиков полный (то есть указатель на блок данных), делает что надо с ним, и кладет указатель в очередь пустых ящиков.

очередь реализуется на мьютексе и двух условных переменных, они скрыты внутри обьекта очередь.

мьютекс внутри очереди лочится только в момент заталкивания или выборки обьекта из нее. поскольку коллизия может быть только в этом месте.

ps

если поток-поставщик пытается получить пустой ящик из очереди пустых, но ящика там нет(туда его еще не положили) - поставщик засыпает на внутренней условной переменной очереди - и становится «ожидающим поставщиком». как только ящик туда положат, ожидающие поставщики(или самый первый ожидающий) будут разбужены, и заберут ящик.

то есть, если ты читаешь из очереди и она пуста - поток засыпает, это блокирующая операция.

и наоборот, если ты пишешь в очередь,и есть ограничение ее длины, и оно достигнуто - пишуший засыпает, до тех пор пока читающий не вытащит оттуда элемент. но это касается очередей с ограничением по длине.

alysnix ★★★
()
Последнее исправление: alysnix (всего исправлений: 1)
Ответ на: комментарий от deep-purple

данные поступают с камеры - ОДИН вход

идут в декодер - однопоточная и самая медленная часть сейчас

после декодера перед энкодером предполагаю скопировать полученный результаты в памяти поскольку полагаю это «мгновенная операция»

энкодер по полученным данным в 12 потоков отправляет данные в ОДИН выход

все это дело занимает 50% цпу как в среднем, так и приблизительно по ядрам, поэтому цель кодирование и декодирование выполнять параллельно.

wolverin ★★★
() автор топика
Последнее исправление: wolverin (всего исправлений: 2)
Ответ на: комментарий от alysnix

у меня один выход и один вход, т.е. сколько бы не брало и не ставило «рабочих» они все ждут очереди на входе-выходе, поэтому задача - чтобы 1 рабочий ставил, другой пересыпал и нес к выходу, процесс их движения проходил одновременно, а во время пересыпки один ждал другого, потому что «пересыпать» БЫСТРАЯ операция, а нести ДАЛЕКО.

wolverin ★★★
() автор топика
Ответ на: комментарий от no-dashi-v2

что за ссылки на буферы? кто их заполнить должен? т.е. после декодера мне нужно скопировать в буфер? для чего мне вообще очередь, если декодер сейчас самая медленная операция и кодировщик все равно упрется по скорости в последние данные от декодера!?

wolverin ★★★
() автор топика
Ответ на: комментарий от LINUX-ORG-RU

надо обмозговать сначала ваш пример мне для начала, но еще раз по очереди - если для очереди нужно копирование данных, то какой в ней тогда смысл, я бы еще понял, что каждый поток создает каждый раз новый объем памяти и помещает ссылку в очередь, но и тут не факт что резервирование памяти шибко быстрее копирования.

да и по памяти я ограничен в 512Мб, а каждый буфер это около 200Кб, при этом сами кодеры-енкодеры отжирают лишь по данным top 40% памяти

wolverin ★★★
() автор топика
Последнее исправление: wolverin (всего исправлений: 1)
Ответ на: комментарий от wolverin

пока пришел к мысли, что проверять статусы декодера и кодировщика буду в том же месте, где заблокированным мутексом буду копировать данные в ДО начала выполнения потоков выделенный буфер, осталось понять как определять что из потока кодировщика пора выполнять копирование, а из потока декодировщика что копирование закончилось… видимо хоть как нужно еще 2 мутекса…

wolverin ★★★
() автор топика
Последнее исправление: wolverin (всего исправлений: 1)
Ответ на: комментарий от wolverin

либо вообще перейти на ДВА семафора и из одного потока проверять от другого значение семафора…

wolverin ★★★
() автор топика
Последнее исправление: wolverin (всего исправлений: 1)
Ответ на: комментарий от wolverin

все это дело занимает 50% цпу как в среднем, так и приблизительно по ядрам

4 ядра у меня на все если это важно

wolverin ★★★
() автор топика
Ответ на: комментарий от wolverin

Не понял что там за 12 потоков в один выход. Вроде ж про два разговор.

Ладно:

Один мьютекс, буфер А и буфер Б. Потоки декодера и энкодера лочат по одному буферу за раз и только в момент записи или чтения (копирования, от себя или к себе). Состояния буферов булевые: записан или прочитан. Оба треда, каждый сам по себе, подходят к буферам по очереди, сначала А, потом Б и так бесконечно. При старте оба буфера в состоянии прочитано.

Энкодер: залочили мьютекс, проверили что там стоит флаг прочитано, записали, поставили флаг записано, разлочили, ушли лопатить следующий кадр. Пришли к буферу Б, делаем аналогично А. Если состояние буфера говорит, что из него еще не прочитано, увы и ах — придется скипать кадр и не писать его в буфер.

Декодер: залочили мьютекс, проверили что там стоит флаг записано, прочитали, поставили флаг прочитано, разлочили, ушли отдавать полученый кадр. Пришли к буферу Б, делаем аналогично А. Если состояние буфера говорит, что в него еще ничего не записано, увы и ах — придется отдать ранее забранный кадр.

Да, мьютекс один, использовать аки лок, а не трайлок - при попадании в ожидание, там ждать микросекунды, лишь окончания записи или чтения + смены флага состояния.

deep-purple ★★★★★
()
Последнее исправление: deep-purple (всего исправлений: 1)

чтобы обозначить владельца мьютекс или что должны входить в структуру контейнера данных

struct chunk {
  void* data;
  size_t data_sz;
  ...
  pthread_mutex_t mutex;
}
zudwa
()
Последнее исправление: zudwa (всего исправлений: 1)
Ответ на: комментарий от imb

спасибо, искал про это, но у меня они меняются местами у потоков чтение и запись

вопщим решил сделать на семафорах, чтобы можно было управлять блокировками из разных потоков, а не как в мютексе из одного - создаю 2 заблоченных семафора для каждого потока, кодер ждет разблокировки своего семафора из потока декодера, запускаясь сразу блочит свой семафор чтобы через итерацияю декодер опять его пнул, декодер же после декодирования разлочит семафор кодера, доходит до своего заблоченного семафора и ждет пока кодер после проверки и копирования разлочит его - и так по циклу вне зависимости какой поток быстрее

единственно думаю логичное предложение не копировать буфер, а делать подмену указателей на буферы и использовать аналог очереди из 2х буферов.

wolverin ★★★
() автор топика
Ответ на: комментарий от deep-purple

12 потоков это библиотека кодера создает, в моем случае это ффмпег 12 потоков масштабирования/замены цветового пространства и следом 4 потока сжатия в х264 (они делят буфер на какие то там слайсы и параллельно кодируют участки буфера) - поток кодирования

просто проблема с декодером мжпег, который в ффмпег однопоточный

wolverin ★★★
() автор топика
Ответ на: комментарий от wolverin

что за ссылки на буферы? кто их заполнить должен?

Ты, естественно

т.е. после декодера мне нужно скопировать в буфер? для чего мне вообще очередь, если декодер сейчас самая медленная операция и кодировщик все равно упрется по скорости в последние данные от декодера!?

У тебя есть связка producer -> consumer.

producer генерирует данные, consumer их потребляет

Как выглядит нормальная схема?

Ты создаешь pipe. У него, как известно, два конца - в один можно записывать, а из другого читать записанное. Читать и писать в этот пайп ты будешь чанками - фрагментами блоками достаточно большого размера. Каждый чанк (например) состоит из четырехбайтного заголовка в котором указан размер данных и зщатем собственно указанного количества байт.

Консамер стартует и начинает читать из пайпа чанк данных. Поскольку данных нет, консамер ожидает их и ничего не делает.

Продюсер генерирует данные и записывает их в буфер. После того, как буфер заполнился, консамер сбрасывает его как чанк в пайп. Консамер читает данные чанка в свой буфер, и после прочтения всего чанка, начинает его обработку.

При этом, продюсер разблокируется сразу после того как консамер прочел данные и начинает обработку новой порции данных.

Пайп выполняет за тебя всю синхронизацию - если консмер не успевает, продюсер будет его ожидать. Если консамер отрабатывает быстрей чем продюсер генерирует данные, то он будет ожидать. Но собственно обработка данных будет идти постоянно, то есть в люой момент времени у тебя будет работать либо продюсер, либо консамер, либо оба (время на передачу данных не учитываем, оно достаточно малое - микросекунды).

Если ты прикроешь одним мутексом чтение чанка из пайп а вторым запись чанка в пайп, то у тебя по сути организуется почти классическая очередь в которую могут писать сколько хочешь продюсеров и из которой в раунд-робине будут брать данные сколько захочешь консамеров.

ПыСы: думаю тебе надо подучить теорию, ты лезешь в области которые требует существенно большей подготовки. Почитай про механизмы семафоров, мутексов, шаред мемори, неименованных каналов.

no-dashi-v2 ★★★
()
Ответ на: комментарий от no-dashi-v2

у меня нет сколько хочешь, у меня всего 2 и городить трубу, которая фактически состоит из либо 2х буферов либо заполнена вообще вся не вижу смысла

wolverin ★★★
() автор топика
Ответ на: комментарий от wolverin

у меня один выход и один вход, т.е. сколько бы не брало и не ставило «рабочих» они все ждут очереди на входе-выходе, поэтому задача - чтобы 1 рабочий ставил, другой пересыпал и нес к выходу

очередь этот вопрос и решает. просто у тебя будет частный случай использования, который не делает саму реализацию очереди проще. канинически очередь имеет стратегию N-кладут, M - получают. если N=M=1, то это просто частный случай.

не делай эту кухню сам, код будет неверный 100%(если до этого не реализовывал тред сейф очереди). бери какую-нить реализацию из известной либы и используй или смотри там код, как оно там сделано(если хочешь понять как оно надо делать правильно).

все советы как это сделать только на мьютексах - в мусорку. это корректно для общего случая делается лишь на кондварах и мьютексах. собственно для реализации очередей(и похожих штук) кондвары и ввели как примитив синхронизации! такая реализация и должна быть в либе.

alysnix ★★★
()
Ответ на: комментарий от firkax

Согласен. Выглядит как первокурсник у которого сишку читают и что-то рассказали про то что есть какие-то мьютексы и семафоры.

peregrine ★★★★★
()
Ответ на: комментарий от wolverin

для чего мне вообще очередь

я конечно не очень люблю многопоточное программирование и сишку или c++, но мне совершенно понятно для чего там очередь. Очередь нужна как раз для того, чтобы у тебч просто и логично управление синхронизацией потоков было.

peregrine ★★★★★
()
Ответ на: комментарий от wolverin

Максимально-быстро - это борьба за процессорные такты, за нано и микро-секунды?

Тогда изоляция двух ядер CPU (isolcpu), привязка двух потоков к этим CPU (affinity, можно cgroup), и, как советовал deep-purple, два буфера. Примитив синхронизации в этом случае - спинлок.

Но не уверен, что можешь позволить в общем случае изолировать процессоры, и не уверен, что это тебе действительно нужно.

blex ★★★
()
  1. переменную int состояния ошибок в потоках - нужно ли 2 таких переменных (ошибка в любом потоке должна останавливать оба), можно ли писать и читать из разных потоков одновременно без мутексов когда

Можно, атомарную.

  1. потоки должны ждать друг друга - после декодирования когда скопируется память, после кодирования когда будут готовы новые данные для копирования - получается еще 2 переменных!? каждую тоже нужно защищать мутексом? или как в п.1 раз один поток только пишет, а другой только читает можно без них обойтись?

Я подозреваю с triple buffering можно обойтись без мьютексов, чисто на CAS-ах указателей на буферы. Проверю потом. Если кто до меня не успеет.

utf8nowhere ★★★
()
Ответ на: комментарий от blex

Да про 2 буфера без копирования, просто замена ссылок, я согласен это выглядит быстро.

Пока борьба сделать из 120 мс хотя бы 70, потом наверно попробую цпу разогнать на 20%

wolverin ★★★
() автор топика
Ответ на: комментарий от utf8nowhere

Пока не знаю что это, но решил сначала попробовать на 2х по дефолту заблокированных семафорах, которые управляются одновременно из разных потоков

wolverin ★★★
() автор топика

ничего не понял, где сложность ?

один поток читает фреймы с камеры, кидает в очередь, воркеры разбирают/обрабатывают, отдают в jitter (чтобы соблюсть очерёдность), оттуда выход.

Это-ж 100500 раз везде реализовано. в ffmpeg наверняка такое можно наплагинить, в gstreamer так абсолютно точно.

для ТС - получаются не переменные, получаются очереди. Внутри очереди mutex (можно rwmutex) и условные переменные. А где то в частностях может и неблокирующие получатся.

и просаживаетесь вы на копировании память-память (которое считаете почему-то бесплатным). посмотрите ради любопытства всякое про звук и видео - там люди сильно напрягаются, лишь-бы избегать копирований фреймов

MKuznetsov ★★★★★
()
Ответ на: комментарий от MKuznetsov

для ТС - получаются не переменные, получаются очереди

Может это зависит от того, какая стратегия реагирования на отставание консьюмера от продьюсера?

utf8nowhere ★★★
()
Ответ на: комментарий от deep-purple

вопщим сделал примитивную очередь из 2х буферов и 2х дефолтно заблокированных семафоров, запускаю поток одного из потока другого, а блокирую до следующего участка синхронизации в этом же и получил то что хотел - время выполнения упало до самого медленного участка кода - потока декодирования (в нем еще заполнение буфера для декодирования есть, но оно считаю на уровне погрешности в сравнении со временем декодирования), чтобы избежать копирование буферов просто смещаю позицию следующего свободного буфера в двойном указателе.

wolverin ★★★
() автор топика
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.