Чтение из Kafka без инцидентов
Знакомясь с продуктом Apache Kafka, вы можете наткнуться такое утверждение – в Kafka соблюдается принцип «тупой брокер – умный потребитель» (для RabbitMQ, стало быть, наоборот). Но в этой статье речь пойдет не о брокере, а о потребителе. Дело в том, что потребитель умный только до пояса, не выше. Чтобы реализовать семантику Exactly-Once (строго однократная доставка) в конечной точке потребителя (обычно это БД), одного «умного» адаптера недостаточно.
Какой бы механизм не использовал адаптер к Kafka: auto-commit или ручную фиксацию сдвига, в обоих случаях легко привести ситуацию, в которой, при падении сервера, вы получаете повторное чтение сообщений.
Auto-commit basically works as a cron with a period set through the auto.commit.interval.ms configuration property. If the consumer crashes, then after a restart or a rebalance, the position of all partitions owned by the crashed consumer will be reset to the last committed offset.
И если у вас к БД идут исключительно идемпотентные операции – не страшно, но обычно, операция INSERT таковой не является.
Если в целевой таблице у вас есть уникальный индекс – хорошо, ситуация решается на уровне логики. В общем случае не рекомендуется в INSERT ставить на конфликт игнорирование дубликата, лучше поставить UPDATE, так как в топик могут пустить повторные сообщения за какой-то период.
Когда в целевой таблице нет индекса …
Если табличка короткая вы можете поместить натуральный или синтетический (номер партиции + смещение + метка времени) ключ сообщения в отдельное поле, чтобы определять и отбрасывать повторы. Впрочем этот вариант маловероятен, поскольку все короткие таблицы это справочники и в них есть уникальные ключи и индексы. Во всех остальных случаях имеет смысл применить более быстрое и надежное решение.
- Нам понадобится внутритранзакционный элемент – табличка, которая будет синхронно с бизнес-операцией коммититься или откатываться.
- Отключаем опцию auto-commit и устанавливаем для себя количество сообщений (commit-size), после обработки которых мы будем фиксировать сдвиг в очереди брокера.
Регламент чтения и обработки сообщения такой …
Получаем из Kafka N записей (commit-size).
Выясняем ключи, которых нет в табличке.
- Открываем транзакцию.
- Выполняем бизнес-операции с действительно новыми сообщениями.
- Добавляем в таблицу обработанные ключи.
- Укорачиваем табличку до нужного размера (commit-size).
- Фиксируем транзакцию.
- Фиксируем смещение в Kafka.
Скрипт для подчистки таблицы …
Ставьте в OFFSET значение в 2-3 раза больше commit-size (на всякий случай). Если commit-size будет большой, поставьте на id индекс с обратной сортировкой. Делайте таблички для каждого топика индивидуально.
Теперь, при падении сервера или докера, вы не получите дубликаты сообщений в целевой таблице.