Обработка потоковых данных с помощью ksqlDB
Представим следующую задачу:
- У нас есть множество датчиков, которые с определенным интервалом времени бросают сообщение в топик kafka с определенной информацией - например температура и влажность;
- Нам необходимо вывести на какой-либо монитор текущие состояние датчиков, с периодическим обновлением, нужно показывать среднюю температуру за несколько интервалов;
- Необходимо реагировать на превышение средней температуры определенного порогового значения;
Эту задачу можно решить с помощью ksqlDB - платформы для обработки потоковых данных на основе Apache Kafka. Это потоковая SQL-база данных для обработки и анализа данных в реальном времени, разработанная Confluent на основе Apache Kafka.
Для чего используется ksqlDB
- Обработка событий в реальном времени - позволяет выполнять сложные аналитические запросы на потоковых данных, получая актуальную информацию о состоянии систем и процессов (мониторинг, аналитика).
- Обнаружение аномалий - выявление в реальном времени аномальных выбросов в данных для выявления чего-то (мошенничество, сбои).
- Трансформация данных - преобразование данных между топиками/стримами, объединение и/или обогащение их.
- Материализованные представления - аггрегация и быстрый доступ к данным для быстрого доступа к агрегированным данным.
Развертывание
ksqlDB работает как сервер, который можно развернуть:
- В виде standalone-сервера
- В кластере (для горизонтального масштабирования)
- В Docker-контейнере
- В облачных сервисах (Confluent Cloud)
Для нашей задачи мы развернем тестовыю kafka и ksqlDB через docker-compose:
Если необходим доступ к интерфейсу командной строки для взаимодействия с ksqlDB, то существует приложение, которое можно поднять сервис в docker-compose - ksqldb-cli.
Полный файл docker-compose.yaml с тестовой конфигурацией kafka, ksqlDB, schema-registry, control-center и ksqldb-cli:
Запустим все наши сервисы через docker-compose:
Эмулятор датчиков
Для демострации работы создал небольшой python script, который каждые 5 секунд будет слать случайные в определенном диапазоне данные по 5 датчикам. Данные генерятся случайным образом:
Каждый из 5 датчиков будет отсылать информацию такого вида:
Прежде чем запускать скрипт, необходимо установить нужные библиотеки:
Запустим наш эмулятор:
Подключаемся к ksqlDB и создаем Stream
Импортируем библиотеки, которые нам понадобятся. Для создания и управления стримами и таблицами ksqlDB, я буду использовать KSQLdbClient из библиотеки ksqldb:
Но эти же запросы можно запускать через утилиту командной строки, достаточно только подсоединиться через команду:
Подключимся клиентом к сервису
Теперь выполним запрос по созданию Stream, который будет обрабатывать наш топик с данными датчиков:
Важно
ksqlDB чувствительна к регистру, поэтому все поля и названия таблиц/стримов выделены в одинарую обратную кавычку (`)
Здесь `KAFKA_TOPIC` имя топика из которого мы будем создавать SREAM, а `VALUE_FORMAT` нужен для корректного парсинга данных. Помимо JSON, можно использовать plain text, а также в формате AVRO с явным указанием схемы (для этого и поднимаем schema-registry), что предпочтительнее использовать в production ready системах.
Агрегация данных по датчикам
Следующий шаг - создадим таблицу, которая будет агрегировать данные по датчикам и находить среднее значение за определенный промежуток времени. Если вам не нужно постоянное материализованное представление, все тоже самое можно сделать используя простой SELECT запрос без создания таблички. Данные будут агрегироваться с помощью GROUP BY и функции AVG(), полный текст запроса:
Теперь, мы можем получить текущие данные сенсоров, усредненных за последниии 5 минут, достаточно выполнить обычный sql запрос:
Либо, если мы хотим получать данные в режиме реального времени, то такой запрос:
Если нам необходимо обрабатывать потоковые данные, ksqlDB предоставляет HTTP API, с помощью которого можно подписаться на потоковые данные. Можно использовать web-socket либо через клиент:
Фильтрация аномалий
Можно добавить условие для отсева некорректных значений (например, температура < 0 или > 50):
Если закинуть неправильные данные в топик, то мы тут же можем получить аномалии, которые возникли. Закинем в топик аномальные значения:
Получим аномалии:
Заключение
Это отличный потоковый движок с SQL-интерфейсом, который позволяет работать с потоками данных в реальном времени так же просто, как с обычными базами данных. Преимущества ksqlDB:
- Знакомый SQL-синтаксис - практически каждый разработчик, дата аналитик знает основной синтаксис SQL.
- Потоковая обработка - работа с данными “на лету”, без задержек.
- Интеграция с Kafka - работает прямо с топиками kafka, можно вставлять данные в топик через INSERT синтаксис sql.
- Масштабируемость - легко растет вместе с нашими потребностями.
- Быстрее время вывода продукта на рынок.
Канал автора по ссылке