在阿里云 Flink 的 SLS Connector 中,通过 query 参数配置 SPL 语句即可实现过滤下推。具体步骤如下:
- 在 Flink 控制台创建 SQL 流作业草稿。
- 创建临时表时,在
WITH子句的query参数中填写 SPL 过滤语句。 - 注意在 Flink SQL 中字符串需要使用英文单引号转义。
示例代码:
CREATE TEMPORARY TABLE sls_input(
request_uri STRING,
slbid STRING,
status STRING
) WITH (
'connector' = 'sls',
'endpoint' = 'cn-beijing-intranet.log.aliyuncs.com',
'accessId' = '${ak}',
'accessKey' = '${sk}',
'project' = '${project}',
'logstore' = 'test-nginx-log',
'query' = '* | where slbid = ''slb-01'''
);
配置后,SLS 服务端会先执行 SPL 过滤,只有符合条件的数据才会传输到 Flink,不符合条件的数据在源头就被过滤掉。