Как мы укротили NATS в Go: Представляем deez-nats — наш ответ на сложность микросервисов

От разработчиков для разработчиков: почему мы устали писать бойлерплейт и создали свой фреймворк.

Как мы укротили NATS в Go: Представляем deez-nats — наш ответ на сложность микросервисов

В нашей архитектуре мы делаем большую ставку на NATS. Это фантастическая технология: быстрая, надежная, с поддержкой как простых Pub/Sub (Core), так и персистентных стримов (JetStream). Но по мере роста количества микросервисов мы столкнулись с проблемой, знакомой многим go-разработчикам.

Мы тратили 30% времени на бизнес-логику и 70% — на "обвязку" NATS. Бесконечные nc.Subscribe, ручной маршалинг JSON, обработка ошибок, управление таймаутами в RPC-вызовах... Код раздувался, а читаемость падала.

Нам нужен был инструмент, который бы давал удобство HTTP-фреймворков (вроде Gin или Echo), но для асинхронного месседжинга. Не найдя идеального решения, мы написали своё.

Встречайте deez-nats — библиотеку, которая превращает работу с NATS в удовольствие.

Философия deez-nats

Когда мы проектировали эту библиотеку, мы ставили во главу угла три принципа:

  1. Developer Experience (DX): API должен быть интуитивным.
  2. Type Safety: Мы любим Go за типизацию, поэтому библиотека активно использует дженерики (Generics).
  3. Production Ready: Graceful shutdown, middleware и контексты — это база, а не опция.

Давайте посмотрим, как это работает на практике.

Установка и Базовая настройка

Всё начинается стандартно. Библиотека требует Go 1.21+, так как мы используем современные фичи языка.

go get github.com/leinodev/deez-nats

В точке входа (main.go) мы инициализируем два основных сервиса: один для RPC (синхронные вызовы request-reply), другой — для событий.

package main import ( "context" "log" "github.com/nats-io/nats.go" "github.com/leinodev/deez-nats/natsevents" "github.com/leinodev/deez-nats/natsrpc" ) func main() { // Подключение к "голому" NATS nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() // Инициализация наших врапперов // Мы можем передать опции конфигурации прямо сюда rpcSvc := natsrpc.New(nc, natsrpc.WithBaseRoute("my-service")) eventsSvc := natsevents.New(nc) // ... регистрация хендлеров ... ctx := context.Background() // Запускаем слушателей в отдельных горутинах go rpcSvc.StartWithContext(ctx) go eventsSvc.StartWithContext(ctx) select {} // Блокируем main }

RPC: Забываем о ручном Publish

В чистом NATS реализация паттерна Request-Reply требует ручного управления Reply топиком. Мы автоматизировали это через абстракцию RPCContext.

Вот как выглядит типичный обработчик в нашем продакшене:

type UserRequest struct { ID string `json:"id"` } type UserResponse struct { Name string `json:"name"` Email string `json:"email"` } // Регистрируем типизированный хендлер rpcSvc.AddRPCHandler("users.get", func(ctx natsrpc.RPCContext) error { var req UserRequest // Request() сам распарсит JSON и вернет ошибку, если пейлоад битый if err := ctx.Request(&req); err != nil { return err } // Эмуляция бизнес-логики user, found := database.FindUser(req.ID) if !found { // Мы можем вернуть ошибку, и вызывающая сторона её получит return fmt.Errorf("user not found") } // Отправляем ответ. Библиотека сама замаршалит структуру в JSON return ctx.Ok(UserResponse{ Name: user.Name, Email: user.Email, }) })

А вот как мы вызываем этот метод из другого микросервиса:

var response UserResponse err := rpcSvc.CallRPC(ctx, "users.get", UserRequest{ID: "101"}, &response)

Никаких таймаутов подписки, никаких interface{}. Чистый, типизированный вызов.

Middleware и Группировка: Организуем код правильно

Когда проект растет, сваливать все маршруты в кучу — плохая идея. Мы внедрили концепцию Групп и Middleware, вдохновленную HTTP-роутерами.

Это позволяет нам, например, добавить логирование или проверку прав доступа только для определенной группы команд.

// Создаем группу маршрутов для админки adminGroup := rpcSvc.Group("admin") // Добавляем Middleware, который будет работать для всех ручек в этой группе adminGroup.Use(func(next natsrpc.RPCHandler) natsrpc.RPCHandler { return func(ctx natsrpc.RPCContext) error { log.Println("Accessing admin area...") // Тут могла быть проверка токена return next(ctx) } }) // Регистрируем хендлер: итоговый топик будет "my-service.admin.delete-user" // (при условии, что base route был "my-service") adminGroup.AddRPCHandler("delete-user", deleteUserHandler)

JetStream и Типизированные События

Работа с JetStream (JS) сложнее, чем с Core NATS, из-за необходимости управлять Consumer'ами и Ack-политиками. Мы постарались скрыть эту сложность, оставив гибкость.

Мы используем дженерики для строгой типизации событий. Больше никаких опечаток в названиях полей JSON.

type OrderCreated struct { OrderID string `json:"order_id"` Amount float64 `json:"amount"` } // Подписываемся на JetStream событие natsevents.AddTypedJetStreamJsonEventHandler( eventsSvc, "orders.created", func(ctx natsevents.EventContext[jetstream.Msg, any], event OrderCreated) error { log.Printf("Обработка заказа: %s на сумму %.2f", event.OrderID, event.Amount) // Обратите внимание: если функция возвращает nil, // библиотека автоматически отправит Ack сообщению! return nil }, // Опционально: указываем Durable Consumer и Queue Group для масштабирования natsevents.WithJetStreamHandlerQueue("order-processor-queue"), )

Килер-фича: Библиотека сама управляет Ack. Если ваш хендлер вернул ошибку, будет отправлен Nak (отрицательное подтверждение), и сообщение будет доставлено повторно согласно политикам JetStream.

Graceful Shutdown: Выходим красиво

В мире Kubernetes поды умирают и рождаются постоянно. Нельзя просто "убить" процесс — нужно дать текущим обработчикам завершить работу.

deez-nats реализует правильный паттерн завершения. Метод Shutdown:

  1. Перестает принимать новые сообщения.
  2. Ждет завершения активных хендлеров.
  3. Закрывает подписки и соединение.
// В main.go sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) <-sigChan shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // Спокойно завершаем работу сервисов if err := rpcSvc.Shutdown(shutdownCtx); err != nil { log.Printf("Error shutting down RPC: %v", err) }

Итог

Мы создали deez-nats не как учебный проект, а как инструмент для решения реальных проблем в продакшене. Он позволил нам сократить объем кода в микросервисах, уменьшить количество ошибок, связанных с типами, и стандартизировать подход к общению между сервисами.

Библиотека поддерживает кастомные маршалеры (хотите Protobuf вместо JSON? Пожалуйста!) и полностью совместима с нативным клиентом nats.go.

Попробуйте внедрить её в свой проект: GitHub: leinodev/deez-nats

Будем рады вашим звездам, ишью и пулл-реквестам. Давайте делать Go-экосистему лучше вместе! 🥜

1
Начать дискуссию