阿里云 Flink 原生支持 SLS Connector,可以将 SLS 的 Logstore 作为源表或结果表使用。配置 SLS 源表的步骤如下:
SLS 侧准备:开通 SLS 服务,创建 Project 和 Logstore,准备具有消费 Logstore 权限的 AccessKey ID/Secret。
Flink 侧创建 SQL 作业:在阿里云 Flink 控制台创建空白 SQL 流作业草稿。
建表语句配置:
CREATE TEMPORARY TABLE sls_input(
request_uri STRING,
status STRING,
`__topic__` STRING METADATA VIRTUAL,
`__source__` STRING METADATA VIRTUAL,
`__timestamp__` STRING METADATA VIRTUAL,
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL
) WITH (
'connector' = 'sls',
'endpoint' = 'cn-beijing-intranet.log.aliyuncs.com',
'accessId' = '${ak}',
'accessKey' = '${sk}',
'starttime' = '2024-01-21 00:00:00',
'project' = '${project}',
'logstore' = 'test-nginx-log'
);
关键参数包括:endpoint 建议使用同地域 SLS 私网地址,starttime 指定消费起始时间,query 可选配置 SPL 语句实现下推。日志字段映射为 Flink 表字段后即可用 SQL 进行实时分析。