{"id":13516,"url":"\/distributions\/13516\/click?bit=1&hash=37bd7b4748a2966bbc26730b25e2618c42f364e4b1fef4e1064b7cb954a0c2b0","title":"\u041f\u043e\u043b\u0443\u0447\u0438\u0442\u044c \u0438\u043d\u0432\u0435\u0441\u0442\u0438\u0446\u0438\u0438 \u043e\u0442 \u00ab\u0413\u0430\u0437\u043f\u0440\u043e\u043c \u043d\u0435\u0444\u0442\u0438\u00bb","buttonText":"\u0417\u0430 \u0447\u0442\u043e?","imageUuid":"9ff0d7f7-ef07-5cab-961b-7241d5749f52","isPaidAndBannersEnabled":false}
Лысенко Андрей

Чтение из Kafka без инцидентов

Знакомясь с продуктом Apache Kafka, вы можете наткнуться такое утверждение – в Kafka соблюдается принцип «тупой брокер – умный потребитель» (для RabbitMQ, стало быть, наоборот). Но в этой статье речь пойдет не о брокере, а о потребителе. Дело в том, что потребитель умный только до пояса, не выше. Чтобы реализовать семантику Exactly-Once (строго однократная доставка) в конечной точке потребителя (обычно это БД), одного «умного» адаптера недостаточно.

Какой бы механизм не использовал адаптер к Kafka: auto-commit или ручную фиксацию сдвига, в обоих случаях легко привести ситуацию, в которой, при падении сервера, вы получаете повторное чтение сообщений.

Auto-commit basically works as a cron with a period set through the auto.commit.interval.ms configuration property. If the consumer crashes, then after a restart or a rebalance, the position of all partitions owned by the crashed consumer will be reset to the last committed offset.

И если у вас к БД идут исключительно идемпотентные операции – не страшно, но обычно, операция INSERT таковой не является.

Если в целевой таблице у вас есть уникальный индекс – хорошо, ситуация решается на уровне логики. В общем случае не рекомендуется в INSERT ставить на конфликт игнорирование дубликата, лучше поставить UPDATE, так как в топик могут пустить повторные сообщения за какой-то период.

Когда в целевой таблице нет индекса …

Если табличка короткая и партиций в топике никогда не будет больше одной, то вы можете поместить натуральный или синтетический (номер партиции + смещение + метка времени) ключ сообщения в отдельное поле, чтобы определять и отбрасывать повторы. Впрочем этот вариант маловероятен, поскольку все короткие таблицы это справочники и в них есть уникальные ключи и индексы. Во всех остальных случаях имеет смысл применить более быстрое и надежное решение.

  1. Нам понадобится внутритранзакционный элемент – табличка, которая будет синхронно с бизнес-операцией коммититься или откатываться.
  2. Отключаем опцию auto-commit и устанавливаем для себя количество сообщений (commit-size), после обработки которых мы будем фиксировать сдвиг в очереди брокера.
CREATE TABLE topic ( id bigserial, key varchar(50) -- синтетический ключ );

Регламент чтения и обработки сообщения такой …

  1. Получаем из Kafka N записей (commit-size).

  2. Выясняем ключи, которых нет в табличке.

  3. Открываем транзакцию.
  4. Выполняем бизнес-операции с действительно новыми сообщениями.
  5. Добавляем в таблицу обработанные ключи.
  6. Укорачиваем табличку до нужного размера (commit-size).
  7. Фиксируем транзакцию.
  8. Фиксируем смещение в Kafka.

Скрипт для подчистки таблицы …

DELETE FROM topic WHERE id <= (SELECT id FROM topic ORDER BY id DESC LIMIT 1 OFFSET 5);

Ставьте в OFFSET значение в 2-3 раза больше commit-size (на всякий случай). Если commit-size будет большой, поставьте на id индекс с обратной сортировкой. Делайте таблички для каждого топика индивидуально.

Теперь, при падении сервера или докера, вы не получите дубликаты сообщений в целевой таблице.

0
Комментарии
Читать все 0 комментариев
null