Как обработать миллион сообщений из kafka и не ждать вечность?!

Одним из средств необходимых для построения event-ориентированной архитектуры, является надежный брокер сообщений. Например Apache Kafka.

Kafka создавалась с учетом следующих требований:

  • Быть чрезвычайно быстрой
  • Предоставлять большую пропускную способность при работе с сообщениями
  • Поддерживать модели «Издатель-Подписчик» и «Точка-Точка»
  • Не замедляться с добавлением потребителей
  • Быть горизонтально масштабируемой; если один брокер, сохраняющий (persists) сообщения, может делать это только на максимальной скорости диска, то для увеличения производительности имеет смысл выйти за пределы одного экземпляра брокера
  • Разграничивать доступ к хранению и повторному извлечению сообщений

Чтобы выполнить требования, описанные выше, Kafka объединила обмен сообщениями типа «публикация-подписка» и «точка-точка» в рамках одного вида адресата — топика. Для полного понимания, как ведут себя топики и какие гарантии и возможности для масштабирования они предоставляют, нам нужно сначала рассмотреть, как они реализованы в Kafka.

У каждого топика в Kafka есть свой журнал (лог) . Продюсеры, отправляющие сообщения в Kafka, дописывают в этот журнал, а консюмеры читают из журнала с помощью указателей, которые постоянно перемещаются вперед. Периодически Kafka удаляет самые старые части журнала, независимо от того, были ли сообщения в этих частях прочитаны или нет. Основной частью дизайна Kafka является то, что брокер не заботится о том, прочитаны ли сообщения или нет — это ответственность клиента.

Журнал Kafka состоит из нескольких партиций. Kafka гарантирует строгую упорядоченность в каждой партиции. Это означает, что сообщения, записанные в партицию в определенном порядке, будут прочитаны в том же порядке. Каждая партиция реализована в виде цикличного (rolling) файла журнала, который содержит подмножество (subset) всех сообщений, отправленных в топик его продюсерами. Созданный топик содержит по-умолчанию одну партицию. Идея партиций — это основная идея Kafka для горизонтального масштабирования.

Как обработать миллион сообщений из kafka и не ждать вечность?!

Журнал Kafka состоит из нескольких партиций. Kafka гарантирует строгую упорядоченность в каждой партиции. Это означает, что сообщения, записанные в партицию в определенном порядке, будут прочитаны в том же порядке. Каждая партиция реализована в виде цикличного (rolling) файла журнала, который содержит подмножество (subset) всех сообщений, отправленных в топик его продюсерами. Созданный топик содержит по-умолчанию одну партицию. Идея партиций — это основная идея Kafka для горизонтального масштабирования.

Чтение сообщений

Клиент, читающий сообщения, управляет именованным указателем, называемым группа консюмеров (consumer group) , который указывает на смещение (offset) сообщения в партиции. Смещение — это позиция с возрастающим номером, которая начинается с 0 в начале партиции. Эта группа консюмеров, на которую ссылаются в API через определяемый пользователем идентификатор group_id, соответствует одному логическому потребителю или системе. Большинство систем, использующих обмен сообщениями, читают данные из адресата посредством нескольких экземпляров и потоков для параллельной обработки сообщений. Таким образом, обычно будет много экземпляров консюмеров, совместно использующих одну и ту же группу консюмеров. Проблему чтения можно представить следующим образом:

  • Топик имеет несколько партиций
  • Использовать топик может одновременно множество групп консюмеров
  • Группа консюмеров может иметь несколько отдельных экземпляров

Это нетривиальная проблема «многие ко многим». Чтобы понять, как Kafka обращается с отношениями между группами консюмеров, экземплярами консюмеров и партициями, рассмотрим ряд постепенно усложняющихся сценариев чтения.

Консюмеры и группы консюмеров

Давайте возьмем в качестве отправной точки топик с одной партицией (Рисунок 2). Рисунок 2.

Рисунок 2. Консюмер читает из партиции
Рисунок 2. Консюмер читает из партиции

Консюмер читает из партиции. Когда экземпляр консюмера подключается со своим собственным group_id к этому топику, ему назначается партиция для чтения и смещение в этой партиции. Положение этого смещения конфигурируется в клиенте, как указатель на самую последнюю позицию (самое новое сообщение) или самую раннюю позицию (самое старое сообщение) . Консюмер запрашивает (polls) сообщения из топика, что приводит к их последовательному чтению из журнала. Позиция смещения регулярно коммитится обратно в Kafka и сохраняется, как сообщения во внутреннем топике _consumer_offsets. Прочитанные сообщения все равно не удаляются, в отличие от обычного брокера, и клиент может перемотать (rewind) смещение, чтобы повторно обработать уже просмотренные сообщения. Когда подключается второй логический консюмер, используя другой group_id, он управляет вторым указателем, который не зависит от первого (Рисунок 3).

Рисунок 3. Два консюмера в разных группах консюмеров читают из одной партиции
Рисунок 3. Два консюмера в разных группах консюмеров читают из одной партиции

Таким образом, топик Kafka действует как очередь, в которой существует один консюмер и, как обычный топик издатель-подписчик (pub-sub) , на который подписаны несколько консюмеров, с дополнительным преимуществом, что все сообщения сохраняются и могут обрабатываться несколько раз. Рисунок 3. Два консюмера в разных группах консюмеров читают из одной партиции

Консюмеры в группе консюмеров

Когда один экземпляр консюмера читает данные из партиции, он полностью контролирует указатель и обрабатывает сообщения, как описано в предыдущем разделе. Если несколько экземпляров консюмеров были подключены с одним и тем же group_id к топику с одной партицией, то экземпляру, который подключился последним, будет передан контроль над указателем и с этого момента он будет получать все сообщения (Рисунок 4). Рисунок 4.

Рисунок 4. Два консюмера в одной и той же группе консюмеров читают из одной партиции
Рисунок 4. Два консюмера в одной и той же группе консюмеров читают из одной партиции

Два консюмера в одной и той же группе консюмеров читают из одной партицииЭтот режим обработки, в котором количество экземпляров консюмеров превышает число партиций, можно рассматривать как разновидность монопольного потребителя. Это может быть полезно, если вам нужна «активно-пассивная» (или «горячая-теплая») кластеризация ваших экземпляров консюмеров, хотя параллельная работа нескольких консюмеров («активно-активная» или «горячая-горячая») намного более типична, чем консюмеры в режиме ожидания.

По-умолчанию в Spring cloud stream опция, отвечающая за автосоздание партиций при старте нового экземпляра консюмера, отключена.

spring. cloud. stream. kafka. binder. autoAddPartitions

If set to true, the binder creates new partitions if required. If set to false, the binder relies on the partition size of the topic being already configured. If the partition count of the target topic is smaller than the expected value, the binder fails to start.

Default: false.

Минимальным количеством партиций можно управлять из конфигурации продюсера. Так же следует учитывать, что данная опция применима, если включена автосоздание партиций:

spring. cloud. stream. kafka. binder. minPartitionCount

Effective only if autoCreateTopics or autoAddPartitions is set. The global minimum number of partitions that the binder configures on topics on which it produces or consumes data. It can be superseded by the partitionCount setting of the producer or by the value of instanceCount * concurrency settings of the producer (if either is larger) .

Default: 1

Чаще всего, когда мы создаем несколько экземпляров консюмеров, мы делаем это либо для параллельной обработки сообщений, либо для увеличения скорости чтения, либо для повышения устойчивости процесса чтения. Поскольку читать данные из партиции может одновременно только один экземпляр консюмера, то как это достигается в Kafka? Один из способов сделать это — использовать один экземпляр консюмера, чтобы прочитать все сообщения и передать их в пул потоков. Хотя этот подход увеличивает пропускную способность обработки, он увеличивает сложность логики консюмеров и ничего не делает для повышения устойчивости системы чтения. Если один экземпляр консюмера отключается из-за сбоя питания или аналогичного события, то вычитка прекращается.Каноническим способом решения этой проблемы в Kafka является использование бОльшего количества партиций.

Партиционирование

Партиции являются основным механизмом распараллеливания чтения и масштабирования топика за пределы пропускной способности одного экземпляра брокера. Чтобы лучше понять это, давайте рассмотрим ситуацию, когда существует топик с двумя партициями и на этот топик подписывается один консюмер (Рисунок 5). Рисунок 5.

Рисунок 5. Один консюмер читает из нескольких партиций
Рисунок 5. Один консюмер читает из нескольких партиций

Один консюмер читает из нескольких партицийВ этом сценарии консюмеру дается контроль над указателями, соответствующими его group_id в обоих партициях, и начинается чтение сообщений из обеих партиций. Когда в этот топик добавляется дополнительный консюмер для того же group_id, Kafka переназначает (reallocate) одну из партиций с первого на второй консюмер. После чего каждый экземпляр консюмера будет вычитывать из одной партиции топика (Рисунок 6). Чтобы обеспечить обработку сообщений параллельно в 20 потоков, вам потребуется как минимум 20 партиций. Если партиций будет меньше, у вас останутся консюмеры, которым не над чем работать, что описано ранее в обсуждении монопольных консюмеров. Рисунок 6.

Рисунок 6. Два консюмера в одной и той же группе консюмеров читают из разных партиций
Рисунок 6. Два консюмера в одной и той же группе консюмеров читают из разных партиций

Эта схема значительно снижает сложность работы брокера Kafka по сравнению с распределением сообщений, необходимым для поддержки очереди JMS. Здесь не нужно заботится о следующих моментах:

  • Какой консюмер должен получить следующее сообщение, основываясь на циклическом (round-robin) распределении, текущей емкости буферов предварительной выборки или предыдущих сообщениях (как для групп сообщений JMS) .
  • Какие сообщения отправлены каким консюмерам и должны ли они быть доставлены повторно в случае сбоя.

Все, что должен сделать брокер Kafka — это последовательно передавать сообщения консюмеру, когда последний запрашивает их. Однако, требования к распараллеливанию вычитки и повторной отправке неудачных сообщений никуда не деваются — ответственность за них просто переходит от брокера к клиенту. Это означает, что они должны быть учтены в вашем коде.

---------------------------------

Важно помнить, что

По-умолчанию в Spring cloud stream опция, отвечающая за автосоздание партиций при старте нового экземпляра консюмера, отключена.

spring. cloud. stream. kafka. binder. autoAddPartitions

If set to true, the binder creates new partitions if required. If set to false, the binder relies on the partition size of the topic being already configured. If the partition count of the target topic is smaller than the expected value, the binder fails to start.

Default: false.

Минимальным количеством партиций можно управлять из конфигурации продюсера. Так же следует учитывать, что данная опция применима, если включена автосоздание партиций:

spring. cloud. stream. kafka. binder. minPartitionCount

Effective only if autoCreateTopics or autoAddPartitions is set. The global minimum number of partitions that the binder configures on topics on which it produces or consumes data. It can be superseded by the partitionCount setting of the producer or by the value of instanceCount * concurrency settings of the producer (if either is larger) .

Default: 1

----------------------------------

Отправка сообщений

Ответственность за решение, в какую партицию отправить сообщение, возлагается на продюсер этого сообщения. Чтобы понять механизм, с помощью которого это делается, сначала нужно рассмотреть, что именно мы на самом деле отправляем. В то время, как в JMS мы используем структуру сообщения с метаданными (заголовками и свойствами) и телом, содержащим полезную нагрузку (payload) , в Kafka сообщение является парой «ключ-значение». Полезная нагрузка сообщения отправляется, как значение (value) . Ключ, с другой стороны, используется главным образом для партиционирования и должен содержать специфичный для бизнес-логики ключ, чтобы поместить связанные сообщений в ту же партицию.

Рассмотрим пример, когда связанные события должны обрабатываться по порядку одним консюмером:

  • Учетная запись пользователя настроена.
  • Деньги зачисляются на счет.
  • Делается ставка, которая выводит деньги со счета.

Если каждое событие представляет собой сообщение, отправленное в топик, то в этом случае естественным ключом будет идентификатор учетной записи. Когда сообщение отправляется с использованием Kafka Producer API, оно передается функции партиционирования, которая, учитывая сообщение и текущее состояние кластера Kafka, возвращает идентификатор партиции, в которую должно быть отправлено сообщение. Эта функция реализована в Java через интерфейс Partitioner. Этот интерфейс выглядит следующим образом:

interface Partitioner { int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); }

Реализация Partitioner для определения партиции использует по-умолчанию алгоритм хеширования ключа (general-purpose hashing algorithm over the key) или циклический перебор (round-robin) , если ключ не указан. Это значение по-умолчанию работает хорошо в большинстве случаев.

Пример из Spring. io

Написание собственной стратегии партиционирования

Давайте рассмотрим пример, когда вы хотите отправить метаданные вместе с полезной нагрузкой сообщения. Полезная нагрузка в нашем примере — это инструкция для внесения депозита на игровой счет. Инструкция — это то, что мы хотели бы гарантированно не модифицировать при передаче и хотим быть уверены, что только доверенная вышестоящая система может инициировать эту инструкцию. В этом случае отправляющая и принимающая системы согласовывают использование подписи для проверки подлинности сообщения. В обычном JMS мы просто определяем свойство «подпись сообщения» и добавляем его к сообщению. Тем не менее, Kafka не предоставляет нам механизм для передачи метаданных — только ключ и значение. Поскольку значение — это полезная нагрузка банковского перевода (bank transfer payload) , целостность которой мы хотим сохранить, у нас не остается другого выбора, кроме определения структуры данных для использования в ключе. Предполагая, что нам нужен идентификатор учетной записи для партиционирования, так как все сообщения, относящиеся к учетной записи, должны обрабатываться по порядку, мы придумаем следующую структуру JSON:

{ "signature": "541661622185851c248b41bf0cea7ad0", "accountId": "10007865234" }

Поскольку значение подписи будет варьироваться в зависимости от полезной нагрузки, дефолтная стратегия хеширования интерфейса Partitioner не будет надежно группировать связанные сообщения. Поэтому нам нужно будет написать свою собственную стратегию, которая будет анализировать этот ключ и разделять (partition) значение accountId.

Пользовательская стратегия партиционирования должна гарантировать, что все связанные сообщения окажутся в одной партиции. Хотя это кажется простым, но требование может быть усложнено из-за важности упорядочивания связанных сообщений и того, насколько фиксировано количество партиций в топике. Количество партиций в топике может изменяться со временем, так как их можно добавить, если трафик выходит за пределы первоначальных ожиданий. Таким образом, ключи сообщений могут быть связаны с партицией, в которую они были первоначально отправлены, подразумевая часть состояния, которое должно быть распределено между экземплярами продюсера. Другим фактором, который следует учитывать, является равномерность распределения сообщений между партициями. Как правило, ключи не распределяются равномерно по сообщениям, и хеш-функции не гарантируют справедливое распределение сообщений для небольшого набора ключей. Важно отметить, что, как бы вы ни решили разделить сообщения, сам разделитель, возможно, придется использовать повторно.

Соглашения по продюсеру

Партиционирование — это не единственное, что необходимо учитывать при отправке сообщений. Давайте рассмотрим методы send () класса Producer в Java API:

Future < RecordMetadata > send(ProducerRecord < K, V > record); Future < RecordMetadata > send(ProducerRecord < K, V > record, Callback callback);

Следует сразу отметить, что оба метода возвращают Future, что указывает на то, что операция отправки не выполняется немедленно. В результате получается, что сообщение (ProducerRecord) записывается в буфер отправки для каждой активной партиции и передается брокеру фоновым потоком в библиотеке клиента Kafka. Хотя это делает работу невероятно быстрой, это означает, что неопытно написанное приложение может потерять сообщения, если его процесс будет остановлен. Как всегда, есть способ сделать операцию отправки более надежной за счет производительности. Размер этого буфера можно установить в 0, и поток отправляющего приложения будет вынужден ждать, пока передача сообщения брокеру не будет завершена, следующим образом:

RecordMetadata metadata = producer.send(record).get();

Только добавление. get() делает код синхронным, т. к. мы блокируем поток до его завершения. Как бы повышаем надежность, но теряем в производительности. Но есть другие, более правильные, способы для обработки ответа, например, использование Callback:

future.addCallback(new ListenableFutureCallback<Object>() { @Override public void onSuccess(Object result) { // Код для успешной отправки } @Override public void onFailure(Throwable ex) { // Код для неуспешной отправки } });

Знание и использование всех вышеперечисленных приемов позволит построить приложение, в котором брокер сообщений не будет узким местом.

22
Начать дискуссию