Обработка потоковых данных с помощью ksqlDB

Обработка потоковых данных с помощью ksqlDB

Представим следующую задачу:

  • У нас есть множество датчиков, которые с определенным интервалом времени бросают сообщение в топик kafka с определенной информацией - например температура и влажность;
  • Нам необходимо вывести на какой-либо монитор текущие состояние датчиков, с периодическим обновлением, нужно показывать среднюю температуру за несколько интервалов;
  • Необходимо реагировать на превышение средней температуры определенного порогового значения;

Эту задачу можно решить с помощью ksqlDB - платформы для обработки потоковых данных на основе Apache Kafka. Это потоковая SQL-база данных для обработки и анализа данных в реальном времени, разработанная Confluent на основе Apache Kafka.

Для чего используется ksqlDB

  • Обработка событий в реальном времени - позволяет выполнять сложные аналитические запросы на потоковых данных, получая актуальную информацию о состоянии систем и процессов (мониторинг, аналитика).
  • Обнаружение аномалий - выявление в реальном времени аномальных выбросов в данных для выявления чего-то (мошенничество, сбои).
  • Трансформация данных - преобразование данных между топиками/стримами, объединение и/или обогащение их.
  • Материализованные представления - аггрегация и быстрый доступ к данным для быстрого доступа к агрегированным данным.

Развертывание

ksqlDB работает как сервер, который можно развернуть:

  • В виде standalone-сервера
  • В кластере (для горизонтального масштабирования)
  • В Docker-контейнере
  • В облачных сервисах (Confluent Cloud)

Для нашей задачи мы развернем тестовыю kafka и ksqlDB через docker-compose:

version: '3.8' services: # kafka configuration ksqldb-server: image: confluentinc/ksqldb-server:0.28.2 hostname: ksqldb-server container_name: ksqldb-server depends_on: - broker - schema-registry ports: - "8088:8088" - "8083:8083" environment: KSQL_CONFIG_DIR: "/etc/ksql" KSQL_BOOTSTRAP_SERVERS: "broker:29092" KSQL_HOST_NAME: ksqldb-server KSQL_LISTENERS: "http://0.0.0.0:8088" KSQL_CACHE_MAX_BYTES_BUFFERING: 0 KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" KSQL_KSQL_CONNECT_URL: "http://connect:8083" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1 KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true' KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true'

Если необходим доступ к интерфейсу командной строки для взаимодействия с ksqlDB, то существует приложение, которое можно поднять сервис в docker-compose - ksqldb-cli.

Полный файл docker-compose.yaml с тестовой конфигурацией kafka, ksqlDB, schema-registry, control-center и ksqldb-cli:

version: '3.8' services: broker: image: confluentinc/cp-kafka:7.9.1 hostname: broker container_name: broker ports: - "9092:9092" - "29092:29092" - "9093:9093" # для контроллера environment: KAFKA_NODE_ID: 1 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092' KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost KAFKA_PROCESS_ROLES: 'broker,controller' KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093' KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092' KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' volumes: - kafka-data:/var/lib/kafka/data schema-registry: image: confluentinc/cp-schema-registry:7.4.1 hostname: schema-registry container_name: schema-registry depends_on: - broker ports: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "PLAINTEXT://broker:29092" SCHEMA_REGISTRY_DEBUG: "true" ksqldb-server: image: confluentinc/ksqldb-server:0.28.2 hostname: ksqldb-server container_name: ksqldb-server depends_on: - broker - schema-registry ports: - "8088:8088" - "8083:8083" environment: KSQL_CONFIG_DIR: "/etc/ksql" KSQL_BOOTSTRAP_SERVERS: "broker:29092" KSQL_HOST_NAME: ksqldb-server KSQL_LISTENERS: "http://0.0.0.0:8088" KSQL_CACHE_MAX_BYTES_BUFFERING: 0 KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" KSQL_KSQL_CONNECT_URL: "http://connect:8083" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1 KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true' KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true' ksqldb-cli: image: confluentinc/ksqldb-cli:0.28.2 container_name: ksqldb-cli depends_on: - ksqldb-server entrypoint: /bin/sh tty: true environment: KSQL_CONFIG_DIR: "/etc/ksql" control-center: image: confluentinc/cp-enterprise-control-center:7.4.1 hostname: control-center container_name: control-center depends_on: - broker - schema-registry - ksqldb-server ports: - "9021:9021" environment: CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' CONTROL_CENTER_CONNECT_CONNECT_CLUSTER: 'ksqldb-server:8083' CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088" CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088" CONTROL_CENTER_REPLICATION_FACTOR: 1 CONTROL_CENTER_STREAMS_NUM_STANDBY_REPLICAS: 0 volumes: kafka-data:

Запустим все наши сервисы через docker-compose:

docker compose up -d

Эмулятор датчиков

Для демострации работы создал небольшой python script, который каждые 5 секунд будет слать случайные в определенном диапазоне данные по 5 датчикам. Данные генерятся случайным образом:

""" Data generatir for sensors""" import json import random import time from uuid import uuid4 from confluent_kafka import Producer # конфигурация Producer'а config = { 'bootstrap.servers': 'localhost:9092', 'client.id': 'sensors-producer' } NUM_SENSORS = 5 TOPIC = "sensors" def generate_data(sensor_id: int) -> dict: """Generate data for sensor""" return { 'sensor_id': sensor_id, 'temperature': random.uniform(20.0, 30.0), 'humidity': random.uniform(30.0, 50.0), 'timestamp': int(time.time()) } def delivery_report(err, msg): """Callback """ if err is not None: print(f"Delivery error: {err}") else: print( f"Message is deliverred to topic {msg.topic()} [partition {msg.partition()}]") def send_message(producer: Producer, topic: str, message: dict): """Send message to topic""" json_message = json.dumps(message).encode( 'utf-8') if message is not None else None producer.produce(topic=topic, key=str(uuid4()), value=json_message, callback=delivery_report) def main(): """The main function""" producer = Producer(config) try: while True: for sensor_id in range(1, NUM_SENSORS + 1): # генерируем случайные данные sensor_data = generate_data(sensor_id) # отправляем данные в Kafka send_message(producer, TOPIC, sensor_data) # пауза между отправками time.sleep(10) except KeyboardInterrupt: print('Stopped.') if __name__ == "__main__": main()

Каждый из 5 датчиков будет отсылать информацию такого вида:

{ "sensor_id": 1, "temperature": 22.3654, "humidity": 45.54948, "timestamp": 1747295100000 }

Прежде чем запускать скрипт, необходимо установить нужные библиотеки:

pip install ksqldb confluent-kafka pandas

Запустим наш эмулятор:

python sensors_generator.py

Подключаемся к ksqlDB и создаем Stream

Импортируем библиотеки, которые нам понадобятся. Для создания и управления стримами и таблицами ksqlDB, я буду использовать KSQLdbClient из библиотеки ksqldb:

from confluent_kafka import Producer import json from ksqldb import KSQLdbClient import random import time from uuid import uuid4 import nest_asyncio

Но эти же запросы можно запускать через утилиту командной строки, достаточно только подсоединиться через команду:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

Подключимся клиентом к сервису

client = KSQLdbClient("http://localhost:8088")

Теперь выполним запрос по созданию Stream, который будет обрабатывать наш топик с данными датчиков:

CREATE STREAM `sensor_stream` ( `sensor_id` INTEGER, `temperature` DOUBLE, `humidity` DOUBLE, `timestamp` BIGINT ) WITH ( KAFKA_TOPIC = 'sensors', VALUE_FORMAT = 'JSON' );

Важно

ksqlDB чувствительна к регистру, поэтому все поля и названия таблиц/стримов выделены в одинарую обратную кавычку (`)

Здесь `KAFKA_TOPIC` имя топика из которого мы будем создавать SREAM, а `VALUE_FORMAT` нужен для корректного парсинга данных. Помимо JSON, можно использовать plain text, а также в формате AVRO с явным указанием схемы (для этого и поднимаем schema-registry), что предпочтительнее использовать в production ready системах.

CREATE_SENSORS_STREAM = """ CREATE STREAM `sensor_stream` ( `sensor_id` INTEGER, `temperature` DOUBLE, `humidity` DOUBLE, `timestamp` BIGINT ) WITH ( KAFKA_TOPIC = 'sensors', VALUE_FORMAT = 'JSON' ); """ client.ksql(CREATE_SENSORS_STREAM)
[{'@type': 'currentStatus', 'statementText': "CREATE STREAM `sensor_stream` (`sensor_id` INTEGER, `temperature` DOUBLE, `humidity` DOUBLE, `timestamp` BIGINT) WITH (KAFKA_TOPIC='sensors', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');", 'commandId': 'stream/`sensor_stream`/create', 'commandStatus': {'status': 'SUCCESS', 'message': 'Stream created', 'queryId': None}, 'commandSequenceNumber': 2, 'warnings': []}]

Агрегация данных по датчикам

Следующий шаг - создадим таблицу, которая будет агрегировать данные по датчикам и находить среднее значение за определенный промежуток времени. Если вам не нужно постоянное материализованное представление, все тоже самое можно сделать используя простой SELECT запрос без создания таблички. Данные будут агрегироваться с помощью GROUP BY и функции AVG(), полный текст запроса:

CREATE TABLE `avg_5min` AS SELECT `sensor_id`, AVG(`temperature`) AS `avg_temp`, AVG(`humidity`) AS `avg_hum`, WINDOWSTART AS `window_start`, WINDOWEND AS `window_end` FROM `sensor_stream` WINDOW TUMBLING (SIZE 5 MINUTES) GROUP BY `sensor_id` EMIT CHANGES;
AVERAGE_5MIN = """ CREATE TABLE `avg_5min` AS SELECT `sensor_id`, AVG(`temperature`) AS `avg_temp`, AVG(`humidity`) AS `avg_hum`, WINDOWSTART AS `window_start`, WINDOWEND AS `window_end` FROM `sensor_stream` WINDOW TUMBLING (SIZE 5 MINUTES) GROUP BY `sensor_id` EMIT CHANGES; """ client.ksql(AVERAGE_5MIN)
[{'@type': 'currentStatus', 'statementText': "CREATE TABLE `avg_5min` WITH (KAFKA_TOPIC='avg_5min', PARTITIONS=1, REPLICAS=1) AS SELECT\n `sensor_stream`.`sensor_id` `sensor_id`,\n AVG(`sensor_stream`.`temperature`) `avg_temp`,\n AVG(`sensor_stream`.`humidity`) `avg_hum`,\n WINDOWSTART `window_start`,\n WINDOWEND `window_end`\nFROM `sensor_stream` `sensor_stream`\nWINDOW TUMBLING ( SIZE 5 MINUTES ) \nGROUP BY `sensor_stream`.`sensor_id`\nEMIT CHANGES;", 'commandId': 'table/`avg_5min`/create', 'commandStatus': {'status': 'SUCCESS', 'message': 'Created query with ID CTAS_AVG_5MIN_3', 'queryId': 'CTAS_AVG_5MIN_3'}, 'commandSequenceNumber': 4, 'warnings': []}]

Теперь, мы можем получить текущие данные сенсоров, усредненных за последниии 5 минут, достаточно выполнить обычный sql запрос:

SELECT * FROM `avg_5min`;

Либо, если мы хотим получать данные в режиме реального времени, то такой запрос:

SELECT * FROM `avg_5min` EMIT CHANGES;
import pandas as pd nest_asyncio.apply() query_result = client.query_sync("SELECT * FROM `avg_5min`;") df = pd.DataFrame(query_result[1:], columns=query_result[0]["columnNames"]) print(df.to_markdown())
Обработка потоковых данных с помощью ksqlDB

Если нам необходимо обрабатывать потоковые данные, ksqlDB предоставляет HTTP API, с помощью которого можно подписаться на потоковые данные. Можно использовать web-socket либо через клиент:

query = """ SELECT * FROM `avg_5min` EMIT CHANGES; """ # Запрос в реальном времени async for row in client.query_async('SELECT * FROM `avg_5min` EMIT CHANGES;'): print(row)

Фильтрация аномалий

Можно добавить условие для отсева некорректных значений (например, температура < 0 или > 50):

CREATE STREAM `filtered_sensors` AS SELECT * FROM `sensor_stream` WHERE `temperature` < 0 OR `temperature` > 50;
FILTERED_SENSORS = """ CREATE STREAM `filtered_sensors` AS SELECT * FROM `sensor_stream` WHERE `temperature` < 0 OR `temperature` > 50; """ client.ksql(FILTERED_SENSORS)
[{'@type': 'currentStatus', 'statementText': "CREATE STREAM `filtered_sensors` WITH (KAFKA_TOPIC='filtered_sensors', PARTITIONS=1, REPLICAS=1) AS SELECT *\nFROM `sensor_stream` `sensor_stream`\nWHERE ((`sensor_stream`.`temperature` < 0) OR (`sensor_stream`.`temperature` > 50))\nEMIT CHANGES;", 'commandId': 'stream/`filtered_sensors`/create', 'commandStatus': {'status': 'SUCCESS', 'message': 'Created query with ID CSAS_FILTERED_SENSORS_5', 'queryId': 'CSAS_FILTERED_SENSORS_5'}, 'commandSequenceNumber': 6, 'warnings': []}]

Если закинуть неправильные данные в топик, то мы тут же можем получить аномалии, которые возникли. Закинем в топик аномальные значения:

from sensors_generator import generate_data, config, send_message, TOPIC sensor_id = 1 sensor_data = generate_data(sensor_id) sensor_data["temperature"] = 100.0 producer = Producer(config) # отправляем данные в Kafka send_message(producer, TOPIC, sensor_data)
Message is deliverred to topic sensors [partition 0]

Получим аномалии:

SELECT * FROM `filtered_sensors`;
query_result = client.query_sync("SELECT * FROM `filtered_sensors`;") df = pd.DataFrame(query_result[1:], columns=query_result[0]["columnNames"]) print(df.to_markdown())
Обработка потоковых данных с помощью ksqlDB

Заключение

Это отличный потоковый движок с SQL-интерфейсом, который позволяет работать с потоками данных в реальном времени так же просто, как с обычными базами данных. Преимущества ksqlDB:

  • Знакомый SQL-синтаксис - практически каждый разработчик, дата аналитик знает основной синтаксис SQL.
  • Потоковая обработка - работа с данными “на лету”, без задержек.
  • Интеграция с Kafka - работает прямо с топиками kafka, можно вставлять данные в топик через INSERT синтаксис sql.
  • Масштабируемость - легко растет вместе с нашими потребностями.
  • Быстрее время вывода продукта на рынок.

Канал автора по ссылке

Начать дискуссию