LINUX.ORG.RU

LockFree очередь сообщений в shared memory - какие хорошие реализации?


1

4

Итак, имеем следующую задачу. Один (или несколько) процессов вычитывают из сети сообщения. Несколько других процессов обрабатывают эти сообщения. Т.е. апи приблизительно такой -

  • retcode_t push(void* data, int size);
  • retcode_t pop(void* data, int max_size, int* size);

При том это все рабоатет на уровне сообщения. Т.е. если вставили тру кусочка по 100, 200, и 300 то и выберем 100, 200, 300 а не 600 одним куском.

Хотелось бы это сделать без блокировок. Писателей/читателей от 4 до 50

★★★
Ответ на: комментарий от vromanov

Ну и простого атомарного свопа недостаточно.. Об этом и был вопрос.

Ну так опиши задачу еще раз, но нормально, со всеми условиями.

ttnl ★★★★★
()
Ответ на: комментарий от vromanov

Эта очередь сообщений для логирования и хочется сохранить последовательность сообщений

Ну и сохраняйте, что мешает?

Количество читатаелей/писателей может меняться

Не важно.

Ну и простого атомарного свопа недостаточно.. Об этом и был вопрос.

Достаточно.

mv ★★★★★
()
Ответ на: комментарий от Absurd

Это не очередь, это стек. Для FIFO нужно dequeue делать из начала, а не с конца.

Есть очередь (список), писатель у последнего элемента свопом меняет указатель next на добавляемый писателем элемент.

mv ★★★★★
()
Ответ на: комментарий от codeogre

«обработчики пусть без блокировки толкают сообщения в очередь лога» об этом и вопрос. Нужна очередь куда можно толкать без блокировки. А так именно так и сделано. Есть очередь, есть отдельный процесс который пишет все на диск.

vromanov ★★★
() автор топика
Ответ на: комментарий от nerdogeek

На самом деле зависит от настроек, но ничего хорошего не будет. Либо не получат доступа коому он нужен, либо получат доступ те, кому нельзя. Тут проектировали не мы, это стандарты 3gpp.

vromanov ★★★
() автор топика
Ответ на: комментарий от vertexua

Там таймер libev который вызывается каждую ms. Если очередь пустая - сразу выходим. На «холостом ходу» цпу почти не жрется.

vromanov ★★★
() автор топика
Ответ на: комментарий от vromanov

Не удовлетворяется условие shared memory

Вы аллокатор не напишите для шаред мемори?

namespace tbb {
        template<typename T, 
        typename Alloc=cache_aligned_allocator<T> >
        class concurrent_queue {

nerdogeek
()
Ответ на: комментарий от mv

А элементы где лежат? Еще раз хочу обратить внимание на условие shared memory. Это могут быть совсем разные процессы.

vromanov ★★★
() автор топика
Ответ на: комментарий от vromanov

А элементы где лежат? Еще раз хочу обратить внимание на условие shared memory. Это могут быть совсем разные процессы.

Пусть лежат в shared memory. Вместо прямых понйтеров тогда ptrdiff, либо вообще можно заммапить сегмент по одинаковому адресу, тогда и прямые пойнтеры работать будут.

mv ★★★★★
()
Ответ на: комментарий от vromanov

Все равно будут блокировки. Как диспетчер будет будет общаться с писателями/читателями?

Внезапно, UNIX domain socket или pipe. При правильном проектировании писателей и читателей единственные три метода, где блокировка необходима - это push, pull и status внутри диспетчера. Других блокирующихся мест нет.

no-dashi ★★★★★
()
Ответ на: комментарий от no-dashi

Вы пробовали засунуть в pipe мегабайт не вычитывая с другой стороны? Как вы себе представляете использование pipe если общение происходит между двумя сервисами и один сервси рестартует? Ну и главное - все это ОЧЕНЬ не быстрое.

vromanov ★★★
() автор топика

Вот lock-free алгоритм. Producer:

1. Начинаешь писать сообщение.

2. Сначала его контрольная сумма.

3. Потом длина

4. Потом тело

5. Потом парочку байт с 0. Их в контрольную сумму не включать.

6. Потом следующее сообщение

Consumer:

1. Читаешь контрольную сумму.

2. Читаешь длину. Если она какая-то странная, то все сообщений нет.

3. Если длина норм, читаешь все сообщение, подсчитывая контрольную сумму. Если совпало, то с помощью CAS пытаешься в последние пару байт записать свой номер consumer только если там 0. Если удалось - сообщение твое.

4. Читаешь следующие сообщение до первого поломанного сообщения или сообщения со странной длиной.

Минусы:

1. Контрольные суммы.

2. Работает только с polling

3. По-хорошему между записями в память без синхронизации нету отношения happens-before. Тут вроде должны спасать контрольные суммы, а так хз.

4. Нет гарантий справедливости распределения нагрузки. Но ты же сказал что считав сообщение его надолго уходят обрабатывать. Тогда вроде норм

Ну и не забудь добавить костылей для перезаписи и вращения по адресному пространству - circle buffer все-таки

vertexua ★★★★★
()
Последнее исправление: vertexua (всего исправлений: 1)
Ответ на: комментарий от vromanov

Ну и главное - все это ОЧЕНЬ не быстрое.

400000 512-байтных трансферов в секунду через pipe - это «небыстрое»? Уточним, что я получил цифру 400000 на каждом из двух каналов, данные через которые шли одновременно, причем на каждой стороне такой же поток читался из /dev/zero и писался в /dev/null, так что можно смело умножать цифру на 1.5

Как вы себе представляете использование pipe если общение происходит между двумя сервисами и один сервси рестартует?

Тривиально. Когда сервис рестартует, его канал закрывается, возникает EOF на чтении или ошибка на записи. И в чем проблема? Если умер отправитель - диспетчер закрывает его канал и ждет нового. Если умирает обработчик - диспетчер закрывает его канал и ждет нового. Когда рестартует диспетчер, подписчики и отправители перерегистрируются на него. Диспетчеров может быть много, подписчиков может быть много, отправителей может быть много. Можно явно перекоммутировать подписчиков и отправителей, можно разделять их по нодам.

Ну не вижу проблем, вот хоть как - не вижу.

no-dashi ★★★★★
()
Ответ на: комментарий от vromanov

И от задачи lockfree queue мы приходим к задаче lockfree allocator

Стэк сообщений фиксированных размеров.

mv ★★★★★
()
Ответ на: комментарий от mv

размер может СИЛЬНО отличаться. От 100 байт до 2^24 по стандарту. На практике 200-10000 байт. ладно, будем сами придумывать..

vromanov ★★★
() автор топика
Ответ на: комментарий от vromanov

Вам не подходит алгоритм с контрольными суммами? Он обычно для дисковых очередей или распределенных, но мог бы сработать.

vertexua ★★★★★
()
Последнее исправление: vertexua (всего исправлений: 1)

Это вы про DIAMETER все пишете? Там же вроде бы ничего сильно уж затейливого, зачем вам такое буйство технологий?

То есть про сам DIAMETER я только в википедии читал, но несколько лет назад сталкивался с телекоммуникационными биллингами, которые аккаунтили RADIUSом. Цифры в десятки тысяч запросов/ответов в секунду никаких эмоций не вызывали. И это было написано, извиняюсь, на Perl с PostgreSQL, прямо вот в лоб: получил запрос, посмотрел в кеш, если нет, то сходил в базу - отдал ответ.

Вам, конечно, виднее как там и что, но то что вы пишете выглядит не очень логично. Если бы вы рассказали зачем эти разбиения на процессы, интенсивное дерганье СУБД в памяти + тонны текстовых логов, и почему вы думаете что все упирается именно в локи, возможно дискуссия заиграла бы другими красками

anonymous
()
Ответ на: комментарий от anonymous

Radius это мелочь по сравнению с диаметром. Как CSV с xml. По производительности и фичам мы уже сравнимы с лидерами рынка и есть куда расти. Ну и сценарии использования куда сложнее чем при использовании радиуса. Наши доки и вообще сам сервер тут freepcrf.com

vromanov ★★★
() автор топика
Ответ на: комментарий от vromanov

размер может СИЛЬНО отличаться. От 100 байт до 2^24 по стандарту. На практике 200-10000 байт. ладно, будем сами придумывать..

Динамическое выделение 2^24 байт быстрым не будет, можете использовать штатный malloc() и про локи не думать даже.

Если это понты, и «на практике 200-10000 байт», то держите несколько стэков с разными размерами, типа 256, 4096, 16384.

mv ★★★★★
()
Ответ на: комментарий от vromanov

Посыл понятен: с таким размером сообщений о lock contention можно не думать.

mv ★★★★★
()
Ответ на: комментарий от anonymous

Сейчас все бросим и перепишем... особенно будет интересно увидеть код парсинга диаметра на эрланге.

vromanov ★★★
() автор топика
Ответ на: комментарий от vromanov

У Эрланга очень приятный поточный бинарный парсер.

mv ★★★★★
()
Ответ на: комментарий от vromanov

Сейчас все бросим и перепишем... особенно будет интересно увидеть код парсинга диаметра на эрланге.

Фу как некрасиво гордиться собственным невежеством. Писать парсеры на ерланге — сплошное удовольствие.

Вот ваш диаметр, правда не знаю насколько полный: http://bit.ly/16c4Htp

ebantrop
()

Хоть меня и игнорят, но спасибо за тред, попробую кстати на досуге запилить свое решение just for lulz

vertexua ★★★★★
()
Ответ на: комментарий от ebantrop

However, the result shows that Erlang Diameter performs much slower. With 3600 Diameter messages per second, Erlang is using 100% of CPU while the in-house Diameter stack is only using 5%. И это не ПРОСТЕЙШЕМ тесте.

vromanov ★★★
() автор топика
Ответ на: комментарий от vertexua

Не игнорят. Просто задачей было найти ПРОВЕРЕННОЕ решение. А так и своих идей хватает. В предложенном варианте меня пугает большая вычислительная сложность взятия сообщения и то, что при блокировке (точнее при одноврменном приходе несколькоих потоков за сообщением) она может неоднократно повторяться.

Плюс хочется сделать вариант с таким-же апи как и существующее решение.

vromanov ★★★
() автор топика
Ответ на: комментарий от vromanov

при блокировке (точнее при одноврменном приходе несколькоих потоков за сообщением) она может неоднократно повторяться.

Да, каждое ядро вычислит сумму как минимум раз, хоть и достаточно быстро (много гигабайтов в секунду). А вот много раз каждое ядро будет ее вычислять в относительно редких случаях одновременного чтения consumerом и записи producerом. Различные костыли на подобии magic numbers могут вероятностно ускорить проверки.

Вообще мопед не мой, а гугла. Но там это очередь на основе Google File System. Но там файлы и еще и распределенные.

vertexua ★★★★★
()
Последнее исправление: vertexua (всего исправлений: 1)
Ответ на: комментарий от vertexua

Почему по разу то? Представим что 10 потоков начали выбирать сообщение одновременно. все посчитали контрольную сумму и один из них взял сообщение. Потом 9 считают контрольную сумму второго собщения, 8 третьего итд.. Итого сложность растет квадратично от числа потоков. Если еще количество потоков больше числа ядер то они умудряются в это время выжрать 100% ЦПУ, мешают друг-другу и другим потокам.

vromanov ★★★
() автор топика
Ответ на: комментарий от vromanov

Ну я как раз и сказал, каждое ядро посчитает контрольную сумму каждого сообщения как минимум один раз, иногда больше. Но это не так печально как кажется. С точки зрения каждого потока (которых КАК ВСЕГДА лучше бы было столько сколько ядер) он просто читает данные подряд в памяти, иногда останавливаясь. Если сообщений хватает, то читает и считает контрольные суммы очень быстро. Я еще раз говорю что речь о многих гигабайтах в секунду. Вероятное такой процесс сможет посчитать контрольную сумму всей вашей ОЗУ быстрее чем за 1 сек.

Зато нет вызовов ядра вообще никаких, никаких блокировок. Ну кроме естественно ваших коллбеков, мы все-таки polling делаем. Но как только пришел event, то каждый поток может одним махом хоть по гектару сообщений проглотить. Просто уходите почаще в обработку и они не будут толкаться на CAS операциях

vertexua ★★★★★
()
Ответ на: комментарий от vromanov

они умудряются в это время выжрать 100% ЦПУ

Вообще-то нет. Они моментально проглотят и поделят все сообщения и будут ждать нового коллбека.

vertexua ★★★★★
()
Ответ на: комментарий от vromanov

Просто потоки крутятся в цикле и ждут освобождения лока

про wait(2) уже говорили?

drBatty ★★
()
Ответ на: комментарий от vromanov

Вы пробовали засунуть в pipe мегабайт не вычитывая с другой стороны? Как вы себе представляете использование pipe если общение происходит между двумя сервисами и один сервси рестартует? Ну и главное - все это ОЧЕНЬ не быстрое.

что значит «не быстрое»? Быстрее не бывает. Просто не может быть. Быстрее pipe только самописный pipe, но лишь на твоём локалхосте.

drBatty ★★
()
Ответ на: комментарий от vromanov

размер может СИЛЬНО отличаться. От 100 байт до 2^24 по стандарту. На практике 200-10000 байт. ладно, будем сами придумывать..

распили на куски, разве это сложно?

drBatty ★★
()
Ответ на: комментарий от vertexua

Вероятное такой процесс сможет посчитать контрольную сумму всей вашей ОЗУ быстрее чем за 1 сек.

Ох, сказочник...

mv ★★★★★
()
Ответ на: комментарий от vromanov

А ну тогда... lock free circular buffer уже советовали? тем более что макс. размер шаред мемори ограничен.

nerdogeek
()
Ответ на: комментарий от vromanov

Наши доки и вообще сам сервер тут freepcrf.com

Я не понял, оно Opensource или нет? Если да, то где взять исходники?

Sorcerer ★★★★★
()

пиши мыло, пришлю тебе уже готовую на C++11. можно прикрутить futex, будет еще и с ожиданием

100% без блокировок, асинхронная

ckotinko ☆☆☆
()
Последнее исправление: ckotinko (всего исправлений: 1)
Ответ на: комментарий от pulo

хренли там смотреть. single-linked-list сообщений, новые добавляются путем

struct message {
   message * next;
}

struct queue {
   message * head;//initila=null
   std::atomic<message *> io;//initial=null
};

void push(queue * q, message * m)
{
   message * x=q.io_load()
   do{
      m->next=x;
   }while(!q->io.compare_exchange_weak(x, m);
}

message * pop(queue * q)
{
   message * n, * p, * x;
   if(q->head)
   {
      x=q->head;
      q->head=x->next;
      return x;
   }
   else
   {
      x = q->io.exchange(0);
      if(x)
      {
         p = 0;
         while(x->next)
         {
            n=x->next;
            x->next=p;
            p=x;
            x=n;
         }
         x->next=p;
      }
   }
   return x;
}

ckotinko ☆☆☆
()
Ответ на: комментарий от ckotinko

как head перестает быть NULL?

if(q->head)
   {
      x=q->head;
      q->head=x->next;
      return x;

Что если два потока начнут исполнять этот код? Ну и для особого кайфа, head показывает на последний элемент.

vromanov ★★★
() автор топика
Ответ на: комментарий от vromanov

push добавляет ссылку на сообщение в стек.

pop забирает из очереди(head) либо подбирает весь стек оптом, разворачивает его(стек LIFO, нам надо FIFO) и вставляет в очередь.

если 2 потока, то fail. используйте N очередей, ибо существующие архитектуры CPU не позволят вам сделать что-либо сверх того, что я написал.

ckotinko ☆☆☆
()
Последнее исправление: ckotinko (всего исправлений: 1)
Ответ на: комментарий от vromanov

даже уточню - вам наверно нужен load-balancing. для этого вам не нужно много чтецов на очередь. вам нужен механизм для оповещения кто готов читать. здесь надо подумать, ибо очередь писалась не под этот случай. можно будет сделать через futex но, как вы понимаете, я уже устал, и мне в лом.

ежели желаете, можете написать мне куда послать код с futex и прочей мурой, и взамен прислать мне рекомендательное письмо(что такой-то чел взял и запилил)

ckotinko ☆☆☆
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.