Чтение из Kafka без инцидентов

Знакомясь с продуктом Apache Kafka, вы можете наткнуться такое утверждение – в Kafka соблюдается принцип «тупой брокер – умный потребитель» (для RabbitMQ, стало быть, наоборот). Но в этой статье речь пойдет не о брокере, а о потребителе. Дело в том, что потребитель умный только до пояса, не выше. Чтобы реализовать семантику Exactly-Once (строго однократная доставка) в конечной точке потребителя (обычно это БД), одного «умного» адаптера недостаточно.

Какой бы механизм не использовал адаптер к Kafka: auto-commit или ручную фиксацию сдвига, в обоих случаях легко привести ситуацию, в которой, при падении сервера, вы получаете повторное чтение сообщений.

Auto-commit basically works as a cron with a period set through the auto.commit.interval.ms configuration property. If the consumer crashes, then after a restart or a rebalance, the position of all partitions owned by the crashed consumer will be reset to the last committed offset.

И если у вас к БД идут исключительно идемпотентные операции – не страшно, но обычно, операция INSERT таковой не является.

Если в целевой таблице у вас есть уникальный индекс – хорошо, ситуация решается на уровне логики. В общем случае не рекомендуется в INSERT ставить на конфликт игнорирование дубликата, лучше поставить UPDATE, так как в топик могут пустить повторные сообщения за какой-то период.

Когда в целевой таблице нет индекса …

Если табличка короткая вы можете поместить натуральный или синтетический (номер партиции + смещение + метка времени) ключ сообщения в отдельное поле, чтобы определять и отбрасывать повторы. Впрочем этот вариант маловероятен, поскольку все короткие таблицы это справочники и в них есть уникальные ключи и индексы. Во всех остальных случаях имеет смысл применить более быстрое и надежное решение.

  1. Нам понадобится внутритранзакционный элемент – табличка, которая будет синхронно с бизнес-операцией коммититься или откатываться.
  2. Отключаем опцию auto-commit и устанавливаем для себя количество сообщений (commit-size), после обработки которых мы будем фиксировать сдвиг в очереди брокера.
CREATE TABLE topic ( id bigserial, key varchar(50) -- синтетический ключ );

Регламент чтения и обработки сообщения такой …

  1. Получаем из Kafka N записей (commit-size).

  2. Выясняем ключи, которых нет в табличке.

  3. Открываем транзакцию.
  4. Выполняем бизнес-операции с действительно новыми сообщениями.
  5. Добавляем в таблицу обработанные ключи.
  6. Укорачиваем табличку до нужного размера (commit-size).
  7. Фиксируем транзакцию.
  8. Фиксируем смещение в Kafka.

Скрипт для подчистки таблицы …

DELETE FROM topic WHERE id <= (SELECT id FROM topic ORDER BY id DESC LIMIT 1 OFFSET 5);

Ставьте в OFFSET значение в 2-3 раза больше commit-size (на всякий случай). Если commit-size будет большой, поставьте на id индекс с обратной сортировкой. Делайте таблички для каждого топика индивидуально.

Теперь, при падении сервера или докера, вы не получите дубликаты сообщений в целевой таблице.

3737 показов
563563 открытия
Начать дискуссию