Мой первый опыт обработки вебхуков: как я учился делать надёжный бэкенд на Python

Я студент, изучаю backend-разработку на Python. Недавно в рамках учебного проекта столкнулся с задачей: нужно было сделать интеграцию с платёжным сервисом. Они присылают уведомление (вебхук), когда пользователь оплатил заказ, а я должен обновить статус в базе.

Поначалу я думал: «Что тут сложного? Просто эндпоинт напишу». Но когда начал копаться глубже, выяснилось, что всё не так просто. В этой статье хочу рассказать, как я пришёл от простого скрипта к архитектуре с очередью задач, и какие грабли при этом собрал. Надеюсь, мой опыт поможет другим новичкам не наступать на те же шишки.

Как я сделал сначала (и почему это было плохо)

Первая версия моего кода выглядела примерно так:

@app.post("/webhook") async def webhook(request: dict): # Обработка прямо здесь update_database(request) send_email_to_user(request) return {"status": "ok"}

Логично же? Пришло событие → обновил базу → отправил письмо. На локальном сервере всё работало идеально. Но когда я попробовал протестировать это под нагрузкой (и просто на реальном интернете), начались странности.

  1. Таймауты. Платёжный сервис ждал ответа не больше 5 секунд. Если база данных тормозила или сервис отправки писем долго отвечал, я получал ошибку.
  2. Потеря данных. Один раз мой сервер упал прямо во время обработки. В итоге деньги у клиента списались, а у меня заказ остался в статусе «ожидает».
  3. Дубли. Иногда приходило два одинаковых уведомления. Моя база данных пыталась создать два одинаковых заказа, и вылезала ошибка уникальности.

Я понял, что так работать нельзя. Нужно разделять «приём» и «обработку».

Как я искал решение

Начал гуглить «как надёжно обрабатывать фоновые задачи». Наткнулся на понятие очередей задач (Task Queues).

Идея мне понравилась:

  1. Вебхук просто кладёт задачу в очередь и сразу отвечает «ОК».
  2. Отдельный процесс (воркер) в спокойном темпе забирает задачи из очереди и делает всю тяжёлую работу.

Для очереди я выбрал Redis. Почему? Потому что он простой, быстрый и его легко поднять через Docker. Для веб-сервера взял FastAPI — он современный и асинхронный.

Архитектура, которая у меня получилась

Я нарисовал схему, чтобы самому лучше понять, как данные бегают. Вот что получилось:

Мой первый опыт обработки вебхуков: как я учился делать надёжный бэкенд на Python

Теперь даже если воркер упадёт, задача останется в Redis и дождётся перезапуска. А API отвечает мгновенно, поэтому платёжный сервис не ругается на таймауты.

Чтобы было понятнее, как это выглядит в системе, вот общая схема компонентов:

Мой первый опыт обработки вебхуков: как я учился делать надёжный бэкенд на Python

Мне нравится, что я могу запустить хоть 10 воркеров, если задач станет много. Это называется горизонтальное масштабирование.

Реализация: код, который у меня работает

Делюсь кодом. Он не идеален, но работает в моём проекте.

1. Приёмщик (API)

Самое важное здесь — не тормозить. Я только проверяю подпись и кидаю в Redis.

from fastapi import FastAPI, HTTPException, Header from pydantic import BaseModel import redis import json import hashlib import os app = FastAPI() # Подключаемся к Redis r = redis.Redis(host='localhost', port=6379, decode_responses=True) SECRET = os.getenv('WEBHOOK_SECRET', 'test_secret') def check_signature(body: str, sign: str) -> bool: """Простая проверка подписи""" my_sign = hashlib.sha256(f"{body}{SECRET}".encode()).hexdigest() return my_sign == sign @app.post("/api/payment") async def payment_webhook(request: dict, x_sign: str = Header(None)): # 1. Безопасность if not check_signature(json.dumps(request), x_sign): raise HTTPException(status_code=403, detail="Bad sign") # 2. Идемпотентность (защита от дублей) # Используем ID транзакции как ключ task_id = request.get('transaction_id') if r.exists(f"processed:{task_id}"): return {"status": "duplicate"} # 3. В очередь task = { "id": task_id, "amount": request.get('amount'), "user_id": request.get('user_id') } r.lpush("payment_queue", json.dumps(task)) # 4. Быстрый ответ return {"status": "accepted"}

2. Воркер (Обработчик)

Это отдельный скрипт, который я запускаю через терминал (или через systemd/Docker в продакшене). Он крутится в бесконечном цикле.

import redis import json import time import logging logging.basicConfig(level=logging.INFO) log = logging.getLogger(__name__) def process_task(data): """Тут бизнес-логика""" log.info(f"Обрабатываем платеж {data['id']}") # Имитация работы с БД time.sleep(1) # Тут мог бы быть запрос к PostgreSQL return True def main(): r = redis.Redis(host='localhost', port=6379) log.info("Воркер запущен...") while True: # Ждем задачу из очереди (блокирующе, 5 секунд) task = r.brpop("payment_queue", timeout=5) if not task: continue _, raw_data = task data = json.loads(raw_data) try: success = process_task(data) if success: # Помечаем как обработанное (храним 1 день) r.setex(f"processed:{data['id']}", 86400, "1") log.info(f"Задача {data['id']} выполнена") except Exception as e: log.error(f"Ошибка: {e}") # Если ошибка - возвращаем задачу в очередь # (в реальном проекте лучше считать попытки) r.lpush("payment_queue", raw_data) time.sleep(5) if __name__ == "__main__": main()

С какими проблемами я столкнулся

Не всё прошло гладко, хочу честно рассказать о багах.

  1. Забыл про подписи. Сначала я не проверял подпись вебхука. Потом прочитал документацию платёжки и понял, что любой человек может отправить POST-запрос на мой сервер и создать себе баланс. Пришлось срочно добавлять проверку HMAC.
  2. Redis пропадал. Когда я перезагружал компьютер, данные в Redis исчезали (он же в памяти). Для учебного проекта это ок, но для реального нужно включать persistence (сохранение на диск) или использовать базу данных как очередь.
  3. Бесконечный цикл ошибок. Если в коде воркера ошибка, задача возвращалась в очередь и сразу же забиралась снова. Воркер уходил в цикл и грел процессор. Сейчас я добавил задержку (time.sleep) и счётчик попыток.

Что я понял в итоге

Эта задача помогла мне разобраться не только в вебхуках, но и в архитектуре в целом.

Что это дало:

  • API стал отвечать за 50 мс вместо 2 секунд.
  • Я перестал бояться, что сервер упадёт и данные потеряются.
  • Я научился работать с Redis не только как с кэшем, но и как с очередью.

Заключение

Для меня это был большой шаг от «просто кода» к «инженерному решению». Если вы тоже только начинаете и делаете интеграции — не ленитесь внедрять очереди сразу. Это сэкономит вам кучу нервов потом.

Начать дискуссию