LINUX.ORG.RU

Как сочетать очередь и БД надёжно и транзакционно?

 ,


0

4

Есть два процесса, работающих с одними данными. Один процесс изменяет данные. К примеру создаёт новую запись. Второй процесс должен понять, что запись создалась и обработать её.

У меня возникают проблемы с тем, как правильно спроектировать транзакцию в БД и работу с очередью.

Первый подход:

  1. Вставляем строку
  2. Коммитим транзакцию.
  3. Отправляем в очередь событие с ID строки.
  4. Получатель получает событие.
  5. Получатель запрашивает данные по ID и работает с ними.

Тут возникает проблема между шагами 2 и 3. Если процесс умер, то запись окажется в БД, а получатель про это не узнает.

Второй подход:

  1. Вставляем строку
  2. Отправляем в очередь событие с ID строки.
  3. Коммитим транзакцию.
  4. Получатель получает событие.
  5. Получатель запрашивает данные по ID и работает с ними.

Тут тоже возникает проблема между шагами 2 и 3. Во-первых транзакция может не закоммититься, а событие уже ушло. Во-вторых получатель может получить событие раньше, чем транзакция закоммитится и не увидит данные.

Можно накрутить какую-то сложную архитектуру с дополнительным полем-статусом, двумя коммитами, «восстановлением» если второй коммит не сработал, получателем, который нормально отрабатывает двойные сообщения. Расписывать не буду, но это всё прям очень сложно выходит.

Самый простой вариант это использовать что-то вроде postgres listen/notify, который умеет отсылать события транзакционно. Но интересует именно работа с внешней очередью, без всяких там двухфазных транзакций и подобного энтерпрайза. Кажется, будто упускаю что-то очевидное.

Если нужна конкретика - пускай будет postgres и kafka

★★★★★

Последнее исправление: vbr (всего исправлений: 1)

Получатель запрашивает данные по ID и работает с ними

Нельзя чтобы получатель запрашивал не по ID, а по timestamp?

«Я тут у вас был такого-то во столько-то. Говорят что-то новое появилось у вас? Посмотрите что там свежее и отдайте мне, пжлста».

Примерно в таком жанре как-то делал одну штуку.

Toxo2 ★★★★
()

Гм. Если они работают с одной базой, то нафига еще какая-то внешняя очередь?

Не проще ли писать в отдельную таблицу ИДшники записей требующих обработки, а обработчик их будет удалять по мере обработки?

vel ★★★★★
()

Много вариантов. Зависит от того что ты делаешь и то что тебе надо получить. Смотри например как tcp работает, идея +- такая же. Ещё есть вариант несколько раз запрашивать если ответ не приехал.

peregrine ★★★★★
()
Ответ на: комментарий от vel

Не проще ли писать в отдельную таблицу ИДшники записей требующих обработки, а обработчик их будет удалять по мере обработки?

Как обработчик будет узнавать, что появилась новая запись? В цикле раз в секунду долбить select? Лишняя нагрузка на базу и задержка до секунды на каждый такой этап.

PS на самом деле сейчас именно так и сделано и именно это я хочу поменять на более «реактивный» подход.

PPS если бы постгрес умел что-то вроде блокирующего select, который бы возвращал результаты или ждал, пока они не появятся, было бы проще. Но я про такую фичу не в курсе.

vbr ★★★★★
() автор топика
Последнее исправление: vbr (всего исправлений: 2)

Ещё погуглил-подумал про такой вариант:

  1. Вставляем строку.
  2. Коммитим.
  3. Отдельный сервис слушает wal-логи.
  4. По получении новой транзакции анализирует её и если условия совпадают то отсылает сообщения.

Кажется debezium так и делает.

Наверное так правильно, да?

vbr ★★★★★
() автор топика
Ответ на: комментарий от urxvt

Все зависит от самой задачи. Тебе эти секунды так важны?

Я хочу, чтобы всё работало моментально. Эти секунды это признак неправильной архитектуры на мой взгляд.

Почему сразу не писать в очередь, зачем еще и в БД?

Помимо статуса непременно ещё что-то будет меняться. От записи в БД не уйти. Или предлагается вообще всё перевести на какой-то CQRS, где все данные в очереди?

vbr ★★★★★
() автор топика
Ответ на: комментарий от urxvt

Ну третий сервис предположительно уже написан и существует, с этим не так страшно. Хотя я пока эту тему не изучал, так, мысли вслух. Я пытаюсь понять, как минимизировать сложность в моём коде. Если это выливается в перенос сложности в готовый сервис, ну и славно.

vbr ★★★★★
() автор топика
Ответ на: комментарий от vbr

Я хочу, чтобы всё работало моментально.

В кампуктере и, тем более, сети ничего не работает моментально.

Эти секунды это признак неправильной архитектуры на мой взгляд.

У тебя неверный взгляд. Тысячи сервисов с асинхронной обработкой этому доказательство.

Помимо статуса непременно ещё что-то будет меняться. От записи в БД не уйти. Или предлагается вообще всё перевести на какой-то CQRS, где все данные в очереди?

Я предлагаю писать сразу в Кафку и уже второй сервис пускай сохраняет в БД. Кафка умеет exactly once.
Но я бы постарался сделать все на старом, добром DB pooling, если нету каких-то особых требований.

urxvt ★★★★★
()
Ответ на: комментарий от vbr

Если это выливается в перенос сложности в готовый сервис, ну и славно.

Ну это в теории и рекламе только так — поставил третий сервис и забыл.

urxvt ★★★★★
()

Отправляем в очередь событие с ID строки.

должно быть два события, 1- пришла строка(ID), 2 - строка закоммичена(ID).

другой процесс может начинать работу по первому, но понимать, что если не придет за некий таймаут событие 2, то надо делать некий откат или что там у тебя.

alysnix ★★★
()

Есть два процесса, работающих с одними данными. Один процесс изменяет данные. К примеру создаёт новую запись. Второй процесс должен понять, что запись создалась и обработать её.

а дальше пошла хрень :-)

вы сделали два процесса сильно связанными. Оба должны знать друг о друге, могут жить только совместно и синхронизовать работу с базой промеж собой. Когда процесс оправляет id записи базы второму процессу, то это и синхронизация и крах всей стабильности и не дай бог,секурности.

пусть оба процесса работают только и исключительно с базой. В упомянутой kafka streams, у pg notify - нормальные средства оповещений

MKuznetsov ★★★★★
()

Тебе нужны либо распределённые транзакции (но сразу придумавай как всё чинить когда оно взорвется), либо рассматривай очередь как ненадежный способ доставки. Т.е. вот есть у тебя объект А, который нужно отправить куда-то в очередь, там его съедят и сделают из него объект Б. А и Б ты хранишь в базе. У А есть статус доставки. Берешь А, отправляешь, получаешь с другой стороны, превращаешь в Б, Б пишешь в базу. Если всё ок - отпраляешь ответ и сохраняешь статус для А, что всё ок. При этом Б должны проверяться на уникальность. Соответственно если Б не записалось - реплеишь отправку А. Если почему-то статус не доехал, то дубль Б дропнется и тоже всё ок.

ya-betmen ★★★★★
()
Последнее исправление: ya-betmen (всего исправлений: 1)

Есть два процесса, работающих с одними данными. Один процесс изменяет данные. К примеру создаёт новую запись. Второй процесс должен понять, что запись создалась и обработать её.

Опосредованно от БД, напрашивается пайплайн, ну а механизмы реализации - это уже частности.

Тем более, что раз уже упомянута кафка, почему бы сразу не посмотреть на апач эйрфлоу?

vvn_black ★★★★★
()
Последнее исправление: vvn_black (всего исправлений: 1)

Во-первых, в сообщении незачем указывать никакие id, читатель сам их из базы узнает. Во-вторых

Тут возникает проблема между шагами 2 и 3. Если процесс умер, то запись окажется в БД, а получатель про это не узнает.

Надо уточнить, в чём тут конкретно проблема. Сообщение не дошло? А если процесс умер не успев ничего записать в базу, то оно тоже потеряется. Если не нравится что база замусоривается чем-то необработанным, то (ведь процесс после умирания перезапустится?) пусть он на старте проверяет нет ли в базе сообщений и если есть - шлёт сообщение в очередь.

Читатель по приёму события из очереди должен проверять наличие новых сообщений в базе, если они есть обрабатывать, если их нет - ничего не делать и не считать это за проблему.

firkax ★★★★★
()

Кажется, будто упускаю что-то очевидное

Как ты хочешь это не будет работать без кульбитов. Нужно в принципе отвязаться от вставки и обработки. Т.е. вместо вставки пуляешь событие с нужными данными. Два подписчика, один вставляет в БД, другой обрабатывает данные. Вместо ID из БД, если тебе нужен уникальный ключ, генеришь UUID как ключ. Гугли event sourcing

no-such-file ★★★★★
()
Последнее исправление: no-such-file (всего исправлений: 1)
Ответ на: комментарий от firkax

в сообщении незачем указывать никакие id, читатель сам их из базы узнает

id в сообщении служит неявным статусом. Если его там не будет, значит статус должен быть в БД. Ну и нагрузка на БД увеличивается: по id выбрать проще, чем даже по индексу (а если ещё БД дурковать с фуллсканом начнёт…). В целом понятно, помимо прочего читатель может, примеру, минуту ждать сообщения из очереди, а по прошествии минуты сделать селект на всякий случай, хуже не будет.

в чём тут конкретно проблема

Данные сохранились, но дальше обработка не пошла. Грубо говоря - покупатель сделал заказ на сайте, а менеджер про него не узнал.

А если процесс умер не успев ничего записать в базу, то оно тоже потеряется.

Иными словами процесс записи в БД и отправки в очередь предлагается сделать, как одну операцию и отправителю исходного сообщения, которое запустило всю операцию, отправлять HTTP 500 или Connection reset, если процесс умер или что-то подобное произошло. И пускай отправитель ещё раз кнопку тыкает или чего-то подобное делает. А второй раз вставку в БД нужно сделать идемпотентно (ну или вообще забыть про старую запись, считая её мусором и вставить новую копию, хотя этот вариант мне не очень нравится). Разумно, это мне в голову почему-то не пришло. Пожалуй это и есть самый правильный вариант в такой постановке.

vbr ★★★★★
() автор топика
Последнее исправление: vbr (всего исправлений: 2)
Ответ на: комментарий от anonymous

Если коротко то вижу так:

Продюсер делает вставку данных (не забываем про поля created и processed), created время создания записи, processed NULL, пуляем NOTIFY. Потребитель подписан на оповещение через LISTEN, при получении оповещения вычитывает все записи с processed IS NULL, побрабатывает и апдейтит эти записи выставляя время обработки.

anonymous
()
Ответ на: комментарий от anonymous

Спасибо, это по сути оптимизация варианта с пустым циклом. Нормальный вариант, я просто хотел понять, как можно скрестить внешнюю очередь. Только вместо описанного варианта лучше сделать в моём случае поле status, тем более там значений будет несколько и обработчиков для каждого статуса будет несколько, но это уже к теме отношения не имеет.

Ещё, к слову, notify можно пулять из триггера, если я правильно понимаю, чтобы вставлятор про это даже не думал.

vbr ★★★★★
() автор топика
Последнее исправление: vbr (всего исправлений: 1)
Ответ на: комментарий от vbr

Как обработчик будет узнавать, что появилась новая запись? В цикле раз в секунду долбить select? Лишняя нагрузка на базу и задержка до секунды на каждый такой этап.

listen/notify.

theNamelessOne ★★★★★
()
Ответ на: комментарий от anonymous

не забываем про поля created и processed

И чем, интересно, два поля лучше одного ts ? Как в последующем используется значение поля processed, помимо IS NULL?

LISTEN/NOTIFY работают на одном ПГ же. А если ПГ разные в разных концах континента? А если на одном конце PG (где клиенты хотят читать), а на другом MSSQL (куда другие клиенты пишут)?

Toxo2 ★★★★
()
Ответ на: комментарий от vbr

Класть в очередь id довольно кривая история. Что если за время пока id был в очереди запись в базе была изменена другой транзакцией? А если запись удалена? Ну допустим читатель очереди какое-то время был выключен, для обновления, например

cobold ★★★★★
()

Все таки задача сформулираванна слишком общо. На таком общем уровне можно порекомендовать самокорректирующися алгоритм. То есть отдельные сбои в информировании должны исправляться в последующем.

Psilocybe ★★★★★
()
Ответ на: комментарий от Toxo2

+1, в http это называется if-modified-since. А если надо разрешение менее секунды (да даже если и не надо), вместо времени можно юзать int-счётчик («время», «версия»).

dimgel ★★★★★
()

В очередь пиши идемпотентно, при чтении из очереди коммить офсеты в ту же базу в которую пишешь данные в одной транзакции.

Reset ★★★★★
()

Кстати ChatGPT подсказал шикарный паттерн. Кажется это вообще лучший вариант.

To design a system that combines database transactions and event messaging in a durable way, consider implementing the Outbox Pattern. This pattern involves adding an intermediate step in your database to store events or messages that need to be published to the event queue. The workflow for Service A would look like this:

  1. Begin Transaction.
  2. Insert Row into your business table.
  3. Insert Event into an «outbox» table within the same transaction. This event contains all the necessary information for the next service (e.g., Service B) to process, such as the row ID. 4 Commit Transaction. At this point, both the business data and the event are durably saved in the database.
  4. Publish Event to the queue from the outbox table. This can be done by a separate process that scans the outbox table for new events and publishes them to the event queue. After a successful publish, the event can be marked as published or deleted from the outbox table to avoid re-publishing.

For Service B, the workflow remains as you described. It listens for messages from the event queue, begins a transaction to update the row, and commits the transaction.

By using the Outbox Pattern, you ensure that:

  • The database operation and the creation of the event are atomic. If the application crashes before the transaction is committed, neither the business operation nor the event will be persisted, maintaining consistency.
  • The event will only be published if the transaction succeeds, avoiding situations where an event is published without the corresponding database change.
  • You decouple the transactional work from event publishing, which can improve performance and reliability.
  • For the process that publishes events from the outbox to the event queue, you can implement a polling mechanism or use database features like Change Data Capture (CDC) if supported, to react to new events in the outbox table more efficiently.

This design improves fault tolerance and ensures that events corresponding to database changes are not lost, even if a service crashes immediately after committing a transaction.

Получается, что мы свели задачу к тому, чтобы для заданной записи в таблице outbox создать сообщение в message queue и потом удалить запись в таблице outbox. Это, конечно, тоже требует координации между базой данных и очередью сообщений. Тут либо будут (в случае проблем) двойные сообщения, либо нужна какая-то фича от системы сообщений для предотвращения двойных сообщений. В целом обе проблемы решить проще, чем исходную. Надо ещё покумекать, но кажется это тоже можно решить с ещё одной таблицей для полученных сообщений. Или мы обновляем БД и пишем, что сообщение с таким-то ID получили (в одной транзакции), или всё откатываем и тогда сообщение обработаем ещё раз.

vbr ★★★★★
() автор топика
Последнее исправление: vbr (всего исправлений: 1)
Ответ на: комментарий от MOPKOBKA

Мне не нужна скорость. Мне нужна реактивность. Это значит обработка данных с максимально возможной скоростью. Как только один процесс свою работу завершил, второй должен начать свою работу, не ожидая ни одной лишней миллисекунды. БД способна обрабатывать тысячи транзакций в секунду. Этой скорости мне более чем достаточно.

vbr ★★★★★
() автор топика

Есть два процесса, работающих с одними данными.

А может ну его нафиг? У вас явно две сильносвязные сущности, которые жить друг без друга не могут. Так может обручить их и пусть живут долго и счастливо и умрут в один день? Надо только назвать современно-модно-молодёжно, чтобы все поняли, это не устаревший монолит, а, напротив, следующая ступень после микросервисов.

ugoday ★★★★★
()
Ответ на: комментарий от vbr
  1. Insert Event into an «outbox» table within the same transaction.

Ну явно же лишнее телодвижение. Получаются две параллельные таблицы, причём в случае автоинкремента PK – даже с одинаковыми ID.

dimgel ★★★★★
()
Последнее исправление: dimgel (всего исправлений: 1)
Ответ на: комментарий от dimgel

Смысл в том, что это универсальная таблица для всех сообщений. Из исходного сообщения не очевидно, но в общем случае таких сервисов много, которые по очереди работают над какими-то данными. И отправку из event в очередь тоже можно сделать вообще отдельным сервисом, который про специфику исходной таблицы ничего не знает.

vbr ★★★★★
() автор топика
Ответ на: комментарий от vbr

Дальнейшее исследование вопроса привело к тому, что это абсолютно стандартный, широко известный в узких кругах, паттерн Transactional outbox, и в случае kafka для его реализации имеется Debezium Outbox Event Router который работает быстро и надёжно.

vbr ★★★★★
() автор топика
Последнее исправление: vbr (всего исправлений: 2)

Может не бд, а fifo использовать, один процесс пишет, второй ожидает-читает-забирает. Просто и надёжно не надо сотрясать базу данных запросами, нет ли чего для меня нового?

s-warus ★★★★
()
Последнее исправление: s-warus (всего исправлений: 1)
Ответ на: комментарий от vbr

скорее анти-паттерн или костыль, т.к. если сломается на чтении этого аутбокса все равно возникнет нарушение целостности, но очевидно никакого более лучшего варианта для «распределенных транзакций» не придумали. Главное ведь чтобы разработчики были всегда виноваты:)?

Syncro ★★★★★
()