Проброс событий в ClickHouse с использованием Vector
От устройства снаружи ИС на фронтальные балансировщики приходят сообщения в локейшн
/<device_location>/status
Файл журнала
/var/log/<device_access_log_file_path>.status.access.log
разбирается с помощью vector, пересылающий события на внутренний балансировщик. Между фронтальными и внутренними балансировщиками есть сетевая связность по порту TCP
<port>
Конфиг vector на фронтальных балансировщиках: Показать
sources: device-status: type: "file" max_line_bytes: 1638400 include: - /var/log/<device_access_log_file_path>.status.access.log transforms: device-status_filter: type: "filter" inputs: - device-status condition: .message != "" device-status_transform: type: "remap" inputs: - device-status_filter source: | .message = parse_jsonI(.message) .message.traffic_source = .host . = .message sinks: sink_clickhouse: type: "clickhouse" inputs: - device-status_transform endpoint: "http://<inner_BGP_IP>:<port>" format: "json_as_string" healthcheck: enabled: false auth: strategy: "basic" user: "<clickhouse_DB_user>" password: "<clickhouse_DB_password>" database: "<clickhouse_DB>" table: "<clickhouse_table>"
Конфиг внутреннего балансировщика, пробрасывающего события в clickhouse: Показать
stream { include /etc/<path_to_balancer_config>/log-format-s.conf; upstream clickhouse8123 { server <clickhouse_BGP_IP>:8123; server <clickhouse_node1_IP>:8123 backup; server <clickhouse_node2_IP>:8123 backup; } server { listen <port>; proxy_pass clickhouse8123; access_log /var/log/<access_log_file_path> main_json_mini_s; error_log /var/log/<error_log_file_path>; } }
Команды clickhouse для создания БД, таблиц и представлений: Показать
# Создание БД CREATE DATABASE vector ON CLUSTER <cluster_name>; DROP TABLE IF EXISTS vector.device_status_logs ON CLUSTER <cluster_name>; # Создание таблицы CREATE TABLE vector.device_status_logs ON CLUSTER <cluster_name> ( 'message' String ) ENGINE = ReplicatedReplacingMergeTree('/сlickhouse/{cluster}/tables/validator_status_logs','{replica}') ORDER BY tuple(); DROP VIEW IF EXISTS vector.device_status_logs_view ON CLUSTER <cluster_name>; # Создание представления CREATE MATERIALIZED VIEW vector.device_status_logs_view ON CLUSTER <cluster_name> ( 'time_stamp' DateTime, 'req' String, 'req_body' String ) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{cluster}/tables/device_status_logs_view','{replica}') ORDER BY time_stamp SETTINGS index_granularity = 8192 AS SELECT parseDateTimeBestEffortOrNull(JSONExtractString(message, 'timestamp')) AS time_stamp, simpleJSONExtractRaw(message, 'req') AS req, simpleJSONExtractRaw(message, 'req_body') AS req_body FROM ( SELECT message FROM vector.device_status_logs );