{"id":13839,"url":"\/distributions\/13839\/click?bit=1&hash=310caf1329692463026b8043ff9088d52dd6a03c2dd0a57cf9acc31d860b46e9","title":"\u041d\u043e\u0432\u044b\u0439 \u043f\u043e\u0434\u043a\u0430\u0441\u0442 \u043f\u0440\u043e \u0431\u0438\u0437\u043d\u0435\u0441-\u043f\u0440\u043e\u0442\u0438\u0432\u043e\u0441\u0442\u043e\u044f\u043d\u0438\u044f \u043c\u0438\u0440\u043e\u0432\u044b\u0445 \u0431\u0440\u0435\u043d\u0434\u043e\u0432","buttonText":"\u041f\u043e\u0434\u0440\u043e\u0431\u043d\u0435\u0435","imageUuid":"b11f7ca0-f0f3-5198-bc97-7a4f1114f6f6","isPaidAndBannersEnabled":false}

Приземление потоков сырых данных из Kafka в ClickHouse

Это статья - памятка, в ней описывается паттерн приземления потока сырых данных (логов Nginx) из Kafka в ClickHouse. Так как логи бывают разного формата, то уместно ожидать, что местами решение будет носить частный характер.

Постоянное хранилище логов

CREATE TABLE db_name.nginx_log ON CLUSTER cluster_name ( id UUID, requested DateTime, http_method String, uri String, ip String ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/db_name/nginx_log', '{replica_name}') PARTITION BY toYYYYMMDD(requested) ORDER BY requested TTL requested + INTERVAL 1 YEAR -- срок хранения 1 год

Временное хранилище логов

Хранилище будет полезно для проверки работы, также, в нем будут оседать нераспарсенные логи.

CREATE TABLE db_name.nginx_log_temp ON CLUSTER cluster_name ( id UUID, invalid UInt8, requested Nullable(DateTime), http_method Nullable(String), uri Nullable(String), ip Nullable(String), raw String, delete_after DateTime ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/db_name/nginx_log', '{replica_name}') ORDER BY delete_after TTL delete_after

Потребитель Kafka

CREATE TABLE db_name.nginx_log_kafka_consumer ON CLUSTER cluster_name ( row String ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092', kafka_topic_list = 'topic_name', kafka_group_name = 'group_name', kafka_format = 'LineAsString', kafka_row_delimiter = '\n', kafka_num_consumers = 1

Транслятор потока во временное хранилище

CREATE MATERIALIZED VIEW db_name.nginx_log_kafka_transfer TO db_name.nginx_log_temp ON CLUSTER cluster_name AS WITH lower(row) as raw, generateUUIDv4() as id, nullif(replaceRegexpOne(raw, '.*?"timestamp" *: *"([^" ]+?)[Tt ]([^" ]+?)".*', '\1 \2'), raw) as requested_str, parseDateTimeBestEffortOrNull(requested_str) as requested, nullif(replaceRegexpOne(raw, '.*?"request_method" *: *"([^" ]+?)".*', '\1'), raw) as http_method, nullif(replaceRegexpOne(raw, '.*?"uri" *: *"([^" ]+?)".*', '\1'), raw) as uri, nullif(replaceRegexpOne(raw, '.*?"remote_addr" *: *"([^" ]+?)".*', '\1'), raw) as ip, toUInt8(requested is null OR http_method is null OR uri is null OR ip is null) as invalid, if(invalid, now() + INTERVAL 1 YEAR, now() + INTERVAL 1 DAY) as delete_after -- срок хранения 1 день/год SELECT id, invalid, requested, http_method, uri, ip, raw, delete_after FROM db_name.nginx_log_kafka_consumer

* replaceRegexpOne - позволяет проложить маршрут к любому фрагменту по постоянным маркерам.

Транслятор потока в постоянное хранилище

CREATE MATERIALIZED VIEW db_name.nginx_log_temp_transfer TO db_name.nginx_log ON CLUSTER cluster_name AS SELECT id, requested, http_method, uri, ip FROM db_name.nginx_log_temp WHERE not invalid

* для более равномерного распределения поток можно отправить в распределенную таблицу (nginx_log_distributed), если она есть.

Распределенные таблицы

На случай, если решение тиражировано на несколько серверов кластера.

CREATE TABLE db_name.nginx_log_total ON CLUSTER cluster_name AS db_name.nginx_log ENGINE = Distributed('cluster_name', 'db_name', 'nginx_log', rand()) CREATE TABLE db_name.nginx_log_temp_total ON CLUSTER cluster_name AS db_name.nginx_log_temp ENGINE = Distributed('cluster_name', 'db_name', 'nginx_log', rand())
0
Комментарии
Читать все 0 комментариев
null