{"id":14285,"url":"\/distributions\/14285\/click?bit=1&hash=346f3dd5dee2d88930b559bfe049bf63f032c3f6597a81b363a99361cc92d37d","title":"\u0421\u0442\u0438\u043f\u0435\u043d\u0434\u0438\u044f, \u043a\u043e\u0442\u043e\u0440\u0443\u044e \u043c\u043e\u0436\u043d\u043e \u043f\u043e\u0442\u0440\u0430\u0442\u0438\u0442\u044c \u043d\u0430 \u043e\u0431\u0443\u0447\u0435\u043d\u0438\u0435 \u0438\u043b\u0438 \u043f\u0443\u0442\u0435\u0448\u0435\u0441\u0442\u0432\u0438\u044f","buttonText":"","imageUuid":""}

Чтение из Kafka без инцидентов

Знакомясь с продуктом Apache Kafka, вы можете наткнуться такое утверждение – в Kafka соблюдается принцип «тупой брокер – умный потребитель» (для RabbitMQ, стало быть, наоборот). Но в этой статье речь пойдет не о брокере, а о потребителе. Дело в том, что потребитель умный только до пояса, не выше. Чтобы реализовать семантику Exactly-Once (строго однократная доставка) в конечной точке потребителя (обычно это БД), одного «умного» адаптера недостаточно.

Какой бы механизм не использовал адаптер к Kafka: auto-commit или ручную фиксацию сдвига, в обоих случаях легко привести ситуацию, в которой, при падении сервера, вы получаете повторное чтение сообщений.

Auto-commit basically works as a cron with a period set through the auto.commit.interval.ms configuration property. If the consumer crashes, then after a restart or a rebalance, the position of all partitions owned by the crashed consumer will be reset to the last committed offset.

И если у вас к БД идут исключительно идемпотентные операции – не страшно, но обычно, операция INSERT таковой не является.

Если в целевой таблице у вас есть уникальный индекс – хорошо, ситуация решается на уровне логики. В общем случае не рекомендуется в INSERT ставить на конфликт игнорирование дубликата, лучше поставить UPDATE, так как в топик могут пустить повторные сообщения за какой-то период.

Когда в целевой таблице нет индекса …

Если табличка короткая вы можете поместить натуральный или синтетический (номер партиции + смещение + метка времени) ключ сообщения в отдельное поле, чтобы определять и отбрасывать повторы. Впрочем этот вариант маловероятен, поскольку все короткие таблицы это справочники и в них есть уникальные ключи и индексы. Во всех остальных случаях имеет смысл применить более быстрое и надежное решение.

  1. Нам понадобится внутритранзакционный элемент – табличка, которая будет синхронно с бизнес-операцией коммититься или откатываться.
  2. Отключаем опцию auto-commit и устанавливаем для себя количество сообщений (commit-size), после обработки которых мы будем фиксировать сдвиг в очереди брокера.
CREATE TABLE topic ( id bigserial, key varchar(50) -- синтетический ключ );

Регламент чтения и обработки сообщения такой …

  1. Получаем из Kafka N записей (commit-size).

  2. Выясняем ключи, которых нет в табличке.

  3. Открываем транзакцию.
  4. Выполняем бизнес-операции с действительно новыми сообщениями.
  5. Добавляем в таблицу обработанные ключи.
  6. Укорачиваем табличку до нужного размера (commit-size).
  7. Фиксируем транзакцию.
  8. Фиксируем смещение в Kafka.

Скрипт для подчистки таблицы …

DELETE FROM topic WHERE id <= (SELECT id FROM topic ORDER BY id DESC LIMIT 1 OFFSET 5);

Ставьте в OFFSET значение в 2-3 раза больше commit-size (на всякий случай). Если commit-size будет большой, поставьте на id индекс с обратной сортировкой. Делайте таблички для каждого топика индивидуально.

Теперь, при падении сервера или докера, вы не получите дубликаты сообщений в целевой таблице.

0
Комментарии
-3 комментариев
Раскрывать всегда