Паттерн Transactional Outbox

Какую потенциальную проблему видите в коде?

Паттерн Transactional Outbox

Сначала создается заявка в БД, после событие о создании отправляется в брокер сообщений(MQ) для оповещения другого сервиса о появлении новой заявки.
Здесь может произойти ошибка в момент отправки события в очередь, когда наша сущность уже создана в БД.
Это приведет к несогласованному состоянию, так как заявка будет хранится только в нашей системе, а другая система не узнает о ней.

Как решить эту проблему, может воспользуемся транзакцией и откатим сохранение, если при отправке события в очередь произойдет ошибка?

await using var transaction = await _unitOfWork.BeginTransaction(); await _applicationRepository.Add(application); // отслеживание заявки для вставки await _unitOfWork.SaveChanges(); // выполняется запрос на вставку в БД try { await _queue.Publish(new ApplicationCreatedEvent(application)); } catch(Exception e) { _logger.LogError(e, "Ошибка при отправке события в очередь"); await transaction.Rollback(); return; } await transaction.Commit();

Тут уже обработали проблему, но не все случаи, тк ошибка может произойти на стороне БД, когда будем коммитить изменения.
Событие в очередь отправили, а у себя не сохранили, тоже не согласованность.
Проблема заключается в том, что эти два действия - сохранение состояния и публикация события, обычно не происходят атомарно.Поэтому на любом из этих этапов может возникнуть сбой и одно из действий не будет выполнено, что приведет к несогласованному состоянию системы.

Тут нам поможет паттерн Transactional Outbox - как избежать потери сообщений в микросервисной архитектуре.

В качестве решения предлагается атомарно в одной транзакции сохранить изменения сущности и само событие, а вторым шагом выполнять публикацию ранее сохраненного события.
Если сущность и соответствующее ему событие сохраняются в рамках одной транзакции, то это гарантирует, что данные не будут потеряны.

1. В одной транзакции создаем сущность и событие (в другой таблице), которое должно отправиться в очередь.

await _applicationRepository.Add(application); await _eventRepository.Add(new ApplicationCreatedEvent(application)); await _unitOfWork.SaveChanges();

2. Отправкой событий из базы в брокер будет заниматься отдельный процесс - Outbox processor.
Реализовать можно в том же сервисе в виде фоновой задачи на основе IHostedService (в ASP.NET Core), Cron джобы или развернуть отдельный Worker Service для этого процесса.
Есть одна проблема - с какой частотой сканировать БД для отправки сообщений в очередь?
Нужно учитывать такие факторы: как часто сообщения приходят, насколько быстро нужно доставлять сообщения, производительность БД и тп.

Паттерн Transactional Outbox

Как работает Outbox Processor?

Он берет пачку событий из базы и обрабатывает данные события, отправляя их в очередь, а после удаляет или помечает события как отправленные (ниже пример отправки события).

async Task SendEvent(ApplicationCreatedEvent applicationCreatedEvent) { try { await _queue.Publish(applicationCreatedEvent); await _eventRepository.Delete(applicationCreatedEvent); } catch(Exception e) { _logger.LogError(e, "Ошибка при обработке события"); } }

Теперь проблема потери сообщений решена, но все ли это или может есть еще какая-то проблема?

Осталась проблема с дублями событий в очереди, тк при удалении события может произойти ошибка, а оно уже было отправлено в очередь.
Для этого нужно реализовать идемпотентную обработку событий - продюсеру необходимо отправлять идентификатор события в сообщении, а подписчику проверять по идентификатору ранее обработанные события.

P.S.:

Если сообщения не настолько критичны, то профукать одну отправку не страшно и можно оставить реализацию как в первом варианте и не пользоваться данным паттерном.

Если нужна более сложная обработка транзакций, то использовать SAGA, чтобы обрабатывать откаты уже выполненных транзакций со сложной цепочкой взаимодействия, хорошо подойдет для распределенных систем.

3
Начать дискуссию