Приземление потоков сырых данных из Kafka в ClickHouse
Это статья - памятка, в ней описывается паттерн приземления потока сырых данных (логов Nginx) из Kafka в ClickHouse. Так как логи бывают разного формата, то уместно ожидать, что местами решение будет носить частный характер.
406
просмотров
Постоянное хранилище логов
CREATE TABLE db_name.nginx_log
ON CLUSTER cluster_name
(
id UUID,
requested DateTime,
http_method String,
uri String,
ip String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/db_name/nginx_log', '{replica_name}')
PARTITION BY toYYYYMMDD(requested)
ORDER BY requested
TTL requested + INTERVAL 1 YEAR -- срок хранения 1 год
Временное хранилище логов
Хранилище будет полезно для проверки работы, также, в нем будут оседать нераспарсенные логи.
CREATE TABLE db_name.nginx_log_temp
ON CLUSTER cluster_name
(
id UUID,
invalid UInt8,
requested Nullable(DateTime),
http_method Nullable(String),
uri Nullable(String),
ip Nullable(String),
raw String,
delete_after DateTime
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/db_name/nginx_log', '{replica_name}')
ORDER BY delete_after
TTL delete_after
Потребитель Kafka
CREATE TABLE db_name.nginx_log_kafka_consumer
ON CLUSTER cluster_name
(
row String
)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic_name',
kafka_group_name = 'group_name',
kafka_format = 'LineAsString',
kafka_row_delimiter = '\n',
kafka_num_consumers = 1
Транслятор потока во временное хранилище
CREATE MATERIALIZED VIEW db_name.nginx_log_kafka_transfer TO db_name.nginx_log_temp
ON CLUSTER cluster_name
AS
WITH
lower(row) as raw,
generateUUIDv4() as id,
nullif(replaceRegexpOne(raw, '.*?"timestamp" *: *"([^" ]+?)[Tt ]([^" ]+?)".*', '\1 \2'), raw) as requested_str,
parseDateTimeBestEffortOrNull(requested_str) as requested,
nullif(replaceRegexpOne(raw, '.*?"request_method" *: *"([^" ]+?)".*', '\1'), raw) as http_method,
nullif(replaceRegexpOne(raw, '.*?"uri" *: *"([^" ]+?)".*', '\1'), raw) as uri,
nullif(replaceRegexpOne(raw, '.*?"remote_addr" *: *"([^" ]+?)".*', '\1'), raw) as ip,
toUInt8(requested is null OR http_method is null OR uri is null OR ip is null) as invalid,
if(invalid, now() + INTERVAL 1 YEAR, now() + INTERVAL 1 DAY) as delete_after -- срок хранения 1 день/год
SELECT
id, invalid, requested, http_method, uri, ip, raw, delete_after
FROM
db_name.nginx_log_kafka_consumer
* replaceRegexpOne - позволяет проложить маршрут к любому фрагменту по постоянным маркерам.
Транслятор потока в постоянное хранилище
CREATE MATERIALIZED VIEW db_name.nginx_log_temp_transfer TO db_name.nginx_log
ON CLUSTER cluster_name
AS
SELECT
id, requested, http_method, uri, ip
FROM
db_name.nginx_log_temp
WHERE
not invalid
* для более равномерного распределения поток можно отправить в распределенную таблицу (nginx_log_distributed), если она есть.
Распределенные таблицы
На случай, если решение тиражировано на несколько серверов кластера.
CREATE TABLE db_name.nginx_log_total
ON CLUSTER cluster_name
AS db_name.nginx_log
ENGINE = Distributed('cluster_name', 'db_name', 'nginx_log', rand())
CREATE TABLE db_name.nginx_log_temp_total
ON CLUSTER cluster_name
AS db_name.nginx_log_temp
ENGINE = Distributed('cluster_name', 'db_name', 'nginx_log', rand())
0
Комментарии