LINUX.ORG.RU

Конкурирующая очередь в thread C++

 ,


0

2

Продолжаю мучить сообщество своими хлупыми вопросами на похожие темы ) Вернуть результат thread в С++ без мутексов?

Необходимо запараллелить 3 потока

  1. получение данных (чтение с камеры)
  2. пересжатие и запись в сеть (декодировани в основном и кодирование в дополнительном уже выполняется кажется эффективно в 2х потоках на Си через 2 семафора, по очереди отпускающие поток друг друга через условную «очередь» из массива 2х указателей, в которой меняется только индекс)
  3. запись данных из п.1 на диск

В однопоточной реализации вариантов 1+3 или 1+2 все работает вроде эффективно, задача 1 выполняется менее 30 мс, 2 - от 30 до 100мс, 3 - в районе 50мс

Теоретически конечно потоки 2 и 3 должны только читать память из потока 1 (хотя что происходит внутри сторонней либы потока 2 я не знаю) и теоретически можно было бы обойтись одним семафором для основного потока 1, НО поток 3 должен читать и записывать данные (кадры) на диск через равные промежутки времени, а поток 2 читать все данные (кадры) без задержек.

В предыдущей теме меня пнули в сторону атомиков, из чтения всяких хабров я понял следующее - работающие только с памятью атомик существенно быстрее, чем системный вызов мьютекса/семафора, однако как я понял в этом случае предлагается ждать в бесконечном цикле изменение атомика - но ведь это загрузит поток на 100%, а у меня только поток 2 грузит оставшиеся 300% на 4х ядрах

При этом как бы еще решить проблему конкуренции за ПОСЛЕДНИЙ кадр, все что раньше попало в очередь уже не имеет значения и просто считаю дропнутым

ЗЫ. Извините за многобукав! ЗЫ1. хотелось бы понять направление в которое двигаться.

★★★

Последнее исправление: wolverin (всего исправлений: 1)

В предыдущей теме меня пнули в сторону атомиков

В предыдущей теме тебе атомики презентовали как способ передать флаг в другой поток, а не для синхронизации потоков. Или ты там не все карты на стол выложил. Ну тогда и ответы будут не точными.

но ведь это загрузит поток на 100%

Ты изобрёл spinlock. Он полезен когда ты уверен, что долго ждать не придётся. Иначе 100500% загрузки ЦП. Это не баг, это фича.

Делай на мьютексах. Ты пытаешься оптимизировать, но это бессмысленно, если нет реализации относительно которой ты оптимизируешь. Сначала сделай по простому, проведи замеры, а потом уже извращайся. Может тебе и мьютексы подойдут. Входной поток ограничен частотой кадров камеры, так что гигантских скоростей не будет.

ox55ff ★★★★★
()
Ответ на: комментарий от aiqu6Ait

Первый раз со встроенными системами?

Тут, скорее, в первый раз с многопоточностью вообще.

eao197 ★★★★★
()
Ответ на: комментарий от ox55ff

да, реализации нет, пока что не понимаю как «бороться» за последний кадр в очереди так чтобы НЕ тормозить потоки 2 (особенно) и 3 (не так критично), если все таки предположить что только один поток может с ней работать (не рисковать вариантом, что потоки 2 и 3 только читающие)

ждать вроде как не долго в варианте с атомиком, но количество ожиданий получается бесконечно

единственная мысль какая приходит в голову это основной поток все время пишет по кругу в условную очередь из 4х кадров, каждый кадр защищается своим мьютексом, первый доступный с условного конца обрабатывает 2 или 3 поток

wolverin ★★★
() автор топика
Последнее исправление: wolverin (всего исправлений: 2)
Ответ на: комментарий от aiqu6Ait

одно из другого вытекает ) каждое ядро слабое, но их хотя б 4, поэтому повысить производительность можно только многопоточностью, до этого были задачи только с не пересекающимися потоками

wolverin ★★★
() автор топика
Последнее исправление: wolverin (всего исправлений: 1)
Ответ на: комментарий от wolverin

Похоже, не в слабости отдельных ядер, а в том, что архитектура приложения - говно… Надеюсь, для получения изображения используется DMA, для запуска обработки кадра - прерывания, работа идет в 2 буфера и в проекте используются протонити (или просто потоки, но они более тяжелые) и fsm?

aiqu6Ait ★★★★
()
Ответ на: комментарий от aiqu6Ait

нет, через драйвер, с получением изображения проблемы нет, есть проблема с медленным пересжатием

wolverin ★★★
() автор топика
Ответ на: комментарий от wolverin

конечно

вот вообще неочевидно и необязательно.

nanopi neo

Ну ресурсов там жопой жуй, на задачу хватит. Сделай профилирование и посмотри, где можно оптимизировать. В конце концов, под твою платформу есть примеры работы с камерой.

aiqu6Ait ★★★★
()
Ответ на: комментарий от aiqu6Ait

я честно говоря ничего кроме ffmpeg-а с его древним cedrus не нашел, пока им же пересжимаю софтово из mjpeg в x264, не 2Мп конечно, но пока сойдет, сейчас хочу пока со своей 3х поточностью разобраться.

wolverin ★★★
() автор топика

В предыдущей теме меня пнули в сторону атомиков, из чтения всяких хабров я понял следующее - работающие только с памятью атомик существенно быстрее, чем системный вызов мьютекса/семафора,

Семафоры и мютексы не делают (по идее) системных вызовов если им не надо никого ждать. Системные вызовы там только когда «занято», для того чтоб не тратить процессорное время а переключиться на другой тред, которому проц сейчас нужнее. А спинлоки (ожидание атомика в цикле) нужны для того, чтоб не отдавать никому проц на время ожидания, когда ожидаемое время ожидания заметно меньше (или хотя бы сравнимо) чем время переключения контекста.

firkax ★★★★★
()

Теоретически конечно потоки 2 и 3 должны только читать память из потока 1 (хотя что происходит внутри сторонней либы потока 2 я не знаю)

Можно проверить наверняка - скорми в свою либу данные, которые находятся в read only памяти, условно:

const int i = 3;

int main() {
        int *a = const_cast<int*>(&i);
        *a = 8; // segfault
}

если сегфолтнится, значит модифицирует.

По организации очереди - ну навскидку, если задачу понял, то мне с дивана кажется годным:

class Queue {
   vector<shared_ptr> data;
   int decode_send_to_net_pos;
   int write_to_disk_pos;
public:
   void insert_frame(shared_ptr in) {
      temp_shared_ptr_buf;
      {
         get_unique_lock;
         data.push_back(in);
         // move range [ 0, min(write_pos, send_pos) )
         // to temp_shared_ptr_buf, adjust pos
      }
      // delete temp_shared_ptr_buf;
   }
   shared_ptr next_frame_for_decode() {
      get_shared_lock;
      return data[decode_send_to_net_pos++];
   }
   shared_ptr next_frame_for_disk();
}

Благодаря шаред_птр заюзан вектор, значит быстрый, произвольный доступ без инвалидации при вставке. Также длительность блокировок должна быть небольшой, накостылить spin lock может быть вполне норм (тест покажет что будет лучше - shared_mutex или spin lock).

kvpfs ★★
()
Последнее исправление: kvpfs (всего исправлений: 1)

Не надо мудрить. Пункты 2 и 3 можно не параллелить. Тогда всё вырождается в простейший алгоритм:

  1. Поток 1 читает блок данных, взводит флаг.

  2. Поток 2 ждёт флага, перекодирует блок данных, отправляет в сеть и уже после этого записывает исходный блок на диск.

От того, что ты отложишь запись на диск на несколько десятков миллисекунд, ничего не поменяется.

Beewek ★★★
()

Задача не очень понятно сформулирована. Хочешь ли ты записывать на диск всё или только какую-то часть? Должно ли быть на диске всё в реальном времени или нет? Потому что вот эта фраза вообще непонятна:

поток 3 должен читать и записывать данные (кадры) на диск через равные промежутки времени

что именно тут требуется?

Поскольку задачи 2 и 3 никак друг другу не мешают, то после готовности новых данных с камеры просто отсылай обоим потокам команду продолжить обработку. Ну и следи, чтобы первый поток не удалил принятые данные раньше чем потоки 2 и 3 закончат их читать.

firkax ★★★★★
()
Ответ на: комментарий от Beewek

Может у него проца впритык хватает на перекодирование, и делать ещё что-то в том же потоке уже никак.

firkax ★★★★★
()
Ответ на: комментарий от Beewek

На диск не критично задержка, лишь бы в 15 фпс укладываться, а вот стрим в сеть упадет по фпс аж на 50 мс, а он вроде как live

Да и может и не быть записи, поэтому выглядит проще поток какой то из 2х не запускать (перекодирование тоже только по событию подключения к своему rtsp серверу делаю, а не все время, запись если требуется постоянна)

wolverin ★★★
() автор топика
Последнее исправление: wolverin (всего исправлений: 1)
Ответ на: комментарий от firkax

Про фразу - на диск надо 15 фпс, а в сеть отдаю насколько условных 3х ядер из 4х хватает (300% из 400 сейчас нагрузка при пересжатии) это где то 10-20 фпс в зависимости от приемлемого отдаваемого разрешения и постоянно плавает.

Ещё в отдельном потоке rtsp сервер крутится сам по себе с каллбеками

В теории пока, реализация участками и однопоточная частично сейчас

wolverin ★★★
() автор топика
Последнее исправление: wolverin (всего исправлений: 2)
Ответ на: комментарий от wolverin

На диск не критично задержка, лишь бы в 15 фпс укладываться, а вот стрим в сеть упадет по фпс аж на 50 мс, а он вроде как live

Ты самое главное не указал: с какой частотой поступают новые блоки. Раз в сколько мс. От этого потом уже все расчёты надо вести.

Beewek ★★★
()
Ответ на: комментарий от Beewek

30 фпс или около 30 мс (не уверен, если читать быстрее, а это USB 2 позволяет, то наверное должны быть повторы кадров, не знаю что камера отдает если ее читать быстрее, не пробовал ещё)) ) и это самый быстрый полагаю родительский поток

wolverin ★★★
() автор топика
Ответ на: комментарий от kvpfs
class Queue {
   vector<shared_ptr> data;
   int decode_send_to_net_pos;
...

это вообще не очередь. где тут ожидание если очередь пуста? где хоть намек на переполнение? любая очередь по передаче данных между тредами должна имплементировать ожидание треда(или нескольких)на выборке при пустой очереди, и ожидание при вставке - если очередь переполнена. очередь может переполниться если группа тредов потребителей сломалась или работает слишком медленно. в данном случае запись на диск идет медленней, чем поступают фреймы.

такая тредсейф очередь делается на мьютексах и условных переменных. реализаций в сети много.

alysnix ★★★
()

НО поток 3 должен читать и записывать данные (кадры) на диск через равные промежутки времени

Это зачем это?

работающие только с памятью атомик существенно быстрее, чем системный вызов мьютекса/семафора

Забудь про это. У тебя есть IO и CPU bound задачи на которые тратится время, а издержками на синхронизацию можно пренебречь, поэтому её нужно делать самым простым способом. Я бы сделал один mutex+condvar на всё про всё.

Логика примерно такая: 1 читает кадр из камеры, оборачивает его в shared_ptr (потокобезопасный, они к слову как раз на атомиках), берёт мутекс и кладёт по копии этого shared_ptr в отдельные очереди для 2 и 3 (сам кадр при этом не копируется, понятное дело), после чего сигналит кондвар. 2 и 3 устроены по стандартной модели консьюмера на кондваре - wait на кондваре, вычитка очереди под мутексом и обработка кадра с отпущенным мутексом. Собственно кто последний из 2 и 3 обработает кадр тот его потокобезопасно освободит уничтожением своей копии shred_ptr.

Типа-правильно конечно сделать разные мутексы и кондвары на очереди 2 и 3, но я сознательно не стал бы этого делать.

slovazap ★★★★★
()
Ответ на: комментарий от alysnix

Концепция описана псевдокодом, странные претензии, странно, что остался незамеченным непараметризированный shared_ptr. Добавить уход в сон читающим - не проблема, а вот в случае переполнения будет потеря данных как ни крути, сцена перед камерой ждать не будет, я бы подрезал начало очереди в главном потоке при разбухании очереди выше лимита. В общем всё это мелочёвка, которую легко добавить при желании.

kvpfs ★★
()
Последнее исправление: kvpfs (всего исправлений: 1)
Ответ на: комментарий от slovazap

Это зачем это?

чтоб чтение потом с архива происходило плавно, а не где то пусто, где то густо, рывки в «дырках» видны на воспроизведении + удерживание постоянной скорости записи дает условно постоянную длину архива

пробую осмыслить логику…

wolverin ★★★
() автор топика
Последнее исправление: wolverin (всего исправлений: 1)
Ответ на: комментарий от alysnix

цели обработать все кадры с камеры нет, главное чтобы потоки 2 и 3 не тормозились в ожидании, а читали самый последний доступный кадр, проверить попробую, но не уверен все таки что поток 2 ничего не пишет при декодировании, поэтому все таки предполагаю, что нельзя один и тот же кадр 2 и 3 потоком читать.

wolverin ★★★
() автор топика
Последнее исправление: wolverin (всего исправлений: 1)

Извините за многобукав! ЗЫ1. хотелось бы понять направление в которое двигаться.

Прочитай уже «C++ Concurrency in action»

slackwarrior ★★★★★
()
Последнее исправление: slackwarrior (всего исправлений: 1)
Ответ на: комментарий от slackwarrior

) хорошо

хотя полагаю все таки надо опять использовать семафор на каждый кадр, а не мьютекс, поскольку его можно проверять на блокировку без задержек и из С++ передавать в Сишный код, осталось придумать алгоритм поиска последнего доступного кадра как для чтения, так и места для записи в очереди, которую тупо представить заранее выделенным массивом указателей и писать по кругу.

вроде не очень тупая затея )

wolverin ★★★
() автор топика
Последнее исправление: wolverin (всего исправлений: 1)
Ответ на: комментарий от wolverin

чтоб чтение потом с архива происходило плавно, а не где то пусто, где то густо, рывки в «дырках» видны на воспроизведении + удерживание постоянной скорости записи дает условно постоянную длину архива

Это звучит как бред. Воспроизведение должно быть привязано к таймстемпам кадров, они появляются в моменты чтения с камеры и едут через весь пайплайн до сохранения на диск (и в сеть тоже, скорее всего, должны ехать). Как писать кадры на диск совершенно пофиг для воспроизведения, а писать большими пачками вместо отдельных кадров скорее всего эффективнее.

slovazap ★★★★★
()

Может быть сделать так?

Поток 1 читает кадр и кладёт в очередь кадров на сжатие в поток 2, который сжимает и кладёт две копии кадра в очереди потоков 3 и 4, которые занимаются отправкой кадров по сети и на диск соответственно.

Мьютексы свои у каждой очереди и они будут блокироваться только на время изменения указателя на голову очереди, это даже можно сделать на атомиках (очередь хранит адреса памяти, где кадр лежит, для быстроты доступа лучше сделать пул памяти).

В этом случае код хорошо масштабируемый и изолированный друг от друга. Профилировать будет легко - просто смотреть размер очередей. Поток 2 потом засунуть в CUDA или типа того.

raspopov
()
Последнее исправление: raspopov (всего исправлений: 1)
Ответ на: комментарий от slovazap

конечно же не пофег, от того что каждый кадр имеет временную метку не означает равномерное воспроизведение, не говоря уже о равномерной нагрузке на диск (в моем случае флешку) как по скорости, так и еще раз повторю по размеру архива, я даже с камеры то смотрю при плавающей фпс вижу эту неравномерность, запись же делается через stdio.h, которая помимо кеша ОС имеет и свой выровненный кеш по размеру сектора (поскольку запись выполняется по кругу на «сырой» диск), а значит абсолютно без разницы по кадру пишется в кеш или пачкой.

wolverin ★★★
() автор топика
Ответ на: комментарий от raspopov

копирование памяти однозначно снижает производительность, да ее у меня всего то 512Мб (при 200Кб на кадр mjpeg из нее куда то девается 30% только чтением и записью на диск), cuda, vdpau, vaapi у меня нет, это arm-ка

да, на атомиках скорее всего надо делать номера позиций массива заранее инициализированных указателей, которые «заблокированы», это избавит от последовательного поиска свободной позиции через проверку семафора, а вот как сделать тот самый пул памяти, из которой можно было бы просто брать последний свободный адрес (БЕЗ удаления и выделения новой памяти) пока не сообразил

wolverin ★★★
() автор топика
Последнее исправление: wolverin (всего исправлений: 2)

У тебя какая-то явно навязчивая обмазываться непотребством. Какие потоки, чего потоки, чего ждут.

Не говоря уже о том, что у тебя 10 фпс и ты собрался «быстрее» что-то делать. Чтобы что? Какая-то конкуренция.

В предыдущей теме меня пнули в сторону атомиков, из чтения всяких хабров я понял следующее - работающие только с памятью атомик существенно быстрее, чем системный вызов мьютекса/семафора, однако как я понял в этом случае предлагается ждать в бесконечном цикле изменение атомика - но ведь это загрузит поток на 100%, а у меня только поток 2 грузит оставшиеся 300% на 4х ядрах

У тебя проблема с пониманием. Если у тебя потоки лочатся - они не выполняются параллельно. Если выполняются - им нужна синхронизация. Атомик здесь помогает.

Очевидно, что если тебе нужно залочить тред, то атомик для этого ненужен. Только зачем этот тред лочить?

Так же, тебе никто не запрещает засыпать долбя этот атомик/прочее. Здесь, конечно, тебя никто не разбудит. Да и с ядерной дриснёй гарантий никаких нет. Поэтому да - тем кому важны задержки - долбят.

К тому же, насколько я понял, у тебя камера быстрее нежели твои читатели. В данном случае у тебя в принципе долбёжки не будет. Ты будешь дропать кадры с камеры, либо что-то ещё. Важно то, что кадры всегда будут.

right_security
()
Ответ на: комментарий от wolverin

Там ненужно ничего копировать. Копировать ро-память это чушь полнейшая.

А так он тебе правильно описал базу. Пишешь, другие читают. Всё - тебе этого хватит. То, что ты нагородил в своих рассуждениях - это какая-то совсем шиза шизы.

right_security
()
Ответ на: комментарий от right_security

а может у тебя проблема с пониманием? такие как ты мелят ахинею, а потом говорят мне - это не поедет, давай купим железку в 100 раз дороже, в предыдущих тема такой же бесвязный поток сознания уже выливали подобные тебе, однако предыдущее поехало успешно, иди куда шел

ЗЫ. изначально было 5 фпс на 2 Мп пересжатия mjpeg в x264

wolverin ★★★
() автор топика
Последнее исправление: wolverin (всего исправлений: 2)
Ответ на: комментарий от wolverin

Ога. Но ты фантазируй дальше. В целом да, я не заходил в этот тупняк. Но что-то решил зайти - зря. В целом по твоим рассуждениям уже было всё ясно.

Ну и как быстро ты потерялся вместо ответа на вопросы. В любом случае тут не запрещено множить невежество. Производи.

right_security
()
Ответ на: комментарий от wolverin

Такие. Я написал про атомики/прочее. Ты ничего не понял и решил не позориться, но вместо того чтобы не отвечать, либо погуглить - ты начал агриться на меня, производя тупняк. Забалтывая и отрицая реальность. Позорься дальше.

right_security
()
Ответ на: комментарий от right_security

про атомики и без тебя понятно - я уже написал что на них логично держать используемые индексы из массива кадров, мне нужно найти последний доступный кадр в этом массиве для чтения (которое считать блокирующим), с учетом того, что 1 поток будет по кругу обновлять этот массив

wolverin ★★★
() автор топика
Последнее исправление: wolverin (всего исправлений: 1)
Ответ на: комментарий от wolverin

Такое. Тебе сообщили, что все твои рассуждения чушь. Атомики и прочее - это тогда, когда пацанам синхронизация/обновление данных нужна миллионы раз в секунду. Ну хотя бы тысячи.

У тебя там 10 фпс. Это нисколько. Есть средства на которых делать такой колхоз. Просто суй в вектор под локом. Тебе этого хватит. Не говоря уже о том, что там никакая очередь в принципе ненужна. Берёшь пускаешь треды на каждый кадр. Всё само работает.

Атомики это сложно. Бери какую-нибудь очередь из либы. Тебе хватит.

Структуры данных на атомиках - это ещё дохрена более сложно. Ты хочешь, чтобы тебя этому научили? Никто этим заниматься не будет.

У меня такое ощущение, что ты хочешь в своих фантазиях что-то победить, но за счёт левых людей. Это тупиковый путь.

Хочешь обсуждать какие-то структуры данных? Чётко формулируй требования. Какие свойство тебе нужны. Там можно уже будет поговорить про использование атомиков.

Хотя бы сообщи что такое "последний?.

right_security
()
Ответ на: комментарий от right_security

последний - тот что с камеры последним считался, для архива одни требования по потоку, для просмотра текущего видео - другие.

может 10 фпс или 100 мс это нисколько, но 30 фпс или 33 мс - это 33% времени ничего не делается кроме чтения с камеры и это пока предельное разрешение (пока фактически не используемое, используется вариант в 15-20 фпс, хотя можно и все 30 брать на меньшем разрешении), может cedrus удастся прикрутить для h264 и что то придумать аппаратное для mjpeg и не переделывать потом очередь, которая окажется не очередь.

и про вектор думаю, но это нужно будет постоянно удалять-вставлять данные, на векторе я держу массив временных меток соответствующих ардесам на диске, для быстрого поиска места воспроизведения архива, но там и скорость записи как раз «нисколько» и однопоточная задача

wolverin ★★★
() автор топика
Последнее исправление: wolverin (всего исправлений: 4)
Ответ на: комментарий от right_security

опять же НИСКОЛЬКО - это время, а ЦПУ уже на 3/4 ЗАНЯТ только чтением с камеры, пересжатием и записью в сеть! а мне еще внешним оборудованием управлять надо

wolverin ★★★
() автор топика
Последнее исправление: wolverin (всего исправлений: 2)
Ответ на: комментарий от wolverin

Пожалуйста, перестаньте кормить этого персонажа. Единственное действенное средство выпиливания его отсюда – это игнорирование его потоков сознания.

eao197 ★★★★★
()
Ответ на: комментарий от eao197

попробуйте валерьяночку для начала, вдруг поможет от настороженности, может я и тупой конечно, но все лучше чем заявлять что вайт по времени это не блокировка

wolverin ★★★
() автор топика
Ответ на: комментарий от wolverin

но все лучше чем заявлять что вайт по времени это не блокировка

Могли бы вы показать пальцем, где я такое утверждал? Точную цитату, плиз.

eao197 ★★★★★
()
Ответ на: комментарий от wolverin

Бремя доказательства лежит на том, кто утверждение сделал. Если вы говорите, что я заявлял «что вайт по времени это не блокировка», то это вам следует предоставить подтверждение.

В предыдущей теме я вам дал ссылку на wait_for для std::future и привел пример, демонстрирующий ожидание в течении 0 (нуля) миллисекунд.

Как из этого может следовать «что вайт по времени это не блокировка» мне не ведомо.

eao197 ★★★★★
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.