背景
日志服务SLS是云原生观测与分析平台,为Log、Metric、Trace等数据提供大规模、低成本、实时的平台化服务,基于日志服务的便捷的数据接入能力,可以将系统日志、业务日志等接入SLS进行存储、分析;阿里云Flink是阿里云基于Apache Flink构建的大数据分析平台,在实时数据分析、风控检测等场景应用广泛。阿里云Flink原生支持阿里云日志服务SLS的Connector,可以在阿里云Flink平台将SLS作为源表或者结果表使用。
在阿里云Flink配置SLS作为源表时,默认会消费SLS的Logstore数据进行动态表的构建,在消费的过程中,可以指定起始时间点,消费的数据也是指定时间点以后的全量数据;在特定场景中,往往只需要对某类特征的日志或者日志的某些字段进行分析处理,此类需求可以通过Flink SQL的WHERE和SELECT完成,这样做有两个问题:1. Connector 从源头拉取了过多不必要的数据行或者数据列造成了网络的开销,2. 这些不必要的数据需要在Flink中进行过滤投影计算,这些清洗工作并不是数据分析的关注的重点,造成了计算的浪费;对于这种场景,有没有更好的办法呢?
答案是肯定的,SLS 推出了SPL语言,可以高效的对日志数据的清洗,加工。这种能力也集成在了日志消费场景,包括阿里云Flink中SLS Connector,通过配置SLS SPL即可实现对数据的清洗规则,在减少网络传输的数据量的同时,也可以减少Flink端计算消耗。
接下来对SPL及SPL在阿里云Flink SLS Connector中应用进行介绍及举例。
SLS SPL介绍

SLS SPL是日志服务推出的一款针对弱结构化的高性能日志处理语言,可以同时在Logtail端、查询扫描、流式消费场景使用,具有交互式、探索式、使用简洁等特点;
SPL基本语法如下:
<data-source>
| <spl-cmd> -option=<option> -option ... <expression>, ... as <output>, ...
| <spl-cmd> ...
| <spl-cmd> ...<spl-cmd>是SPL指令,支持行过滤、列扩展、列裁剪、正则取值、字段投影、数值计算、JSON、CSV等半结构化数据处理,具体参考 SPL指令介绍,具体来说包括:
结构化数据SQL计算指令:支持行过滤、列扩展、数值计算、SQL函数调用
- where 通过SQL表达式计算结果产生新字段
- extend 根据SQL表达式计算结果过滤数据条目
*
| extend latency=cast(latency as BIGINT)
| where status='200' AND latency>100
字段操作指令:支持字段投影、字段重名、列裁剪
- project 保留与给定模式相匹配的字段、重命名指定字段
- project-away 保留与给定模式相匹配的字段、重命名指定字段
- project-rename 重命名指定字段,并原样保留其他所有字段
*
| project-away -wildcard "__tag__:*"
| project-rename __source__=remote_addr
非结构化数据提取指令:支持JSON、正则、CSV等非结构化字段值处理
- parse-regexp 提取指定字段中的正则表达式分组匹配信息
- parse-json 提取指定字段中的第一层JSON信息
- parse-csv 提取指定字段中的CSV格式信息
*
| parse-csv -delim='^_^' content as time, body
| parse-regexp body, '(\S+)\s+(\w+)' as msg, userSPL在中Flink SLS Connector中的原理介绍
阿里云Flink支持SLS Connector,通过SLS Connector实时拉取SLS中Logstore的数据,分析后的数据也可以实时写入SLS,作为一个高性能计算引擎,Flink SQL也在越来越广泛的应用在Flink计算中,借助SQL语法可以对结构化的数据进行分析。
在SLS Connector中,可以配置日志字段为Flink SQL中的Table字段,然后基于SQL进行数据分析;在未支持SPL配置之前,SLS Connector会实时消费全量的日志数据到Flink计算平台,当前消费方式有如下特点:
- 在Flink中计算的往往不需要所有的日志行,比如在安全场景中,可能仅需要符合某种特征的数据,需要进行日志进行过滤,事实上不需要的日志行也会被拉取,造成网络带宽的浪费。
- 在Flink中计算的一般是特定的字段列,比如在Logstore中有30个字段,真正需要在Flink计算的可能仅有10个字段,全字段的拉取造成了网络带宽的浪费。
在以上场景中,可能会增加并不需要的网络流量和计算开销,基于这些特点,SLS 将SPL的能力集成到SLS Connector的新版本中,可以实现数据在到达Flink之前已经进行了行过滤和列裁剪,这些预处理能力内置在SLS服务端,可以达到同时节省网络流量与Flink计算(过滤、列裁剪)开销的目的。
原理对比
- 未配置SPL语句时:Flink会拉取SLS的全量日志数据(包含所有列、所有行)进行计算,如图1。
- 配置SPL语句时:SPL可以对拉取到的数据如果SPL语句包含过滤及列裁剪等,Flink拉取到的是进行过滤和列裁剪后部分数据进行计算,如图2。

在Flink中使用SLS SPL
接下来以一个Nginx日志为例,来介绍基于SLS SPL的能力来使用Flink。为了便于演示,这里在Flink控制台配置SLS的源表,然后开启一个连续查询以观察效果。在实际使用过程中,可以直接修改SLS源表,保留其余分析和写出逻辑。
接下来介绍下阿里云Flink中使用SPL实现行过滤与列裁剪功能。
在SLS准备数据
- 开通SLS,在SLS创建Project,Logstore,并创建具有消费Logstore的权限的账号AK/SK。
- 当前Logstore数据使用SLS的的SLB七层日志模拟接入方式产生模拟数据,其中包含10多个字段。

模拟接入会持续产生随机的日志数据,日志内容示例如下:
{
"__source__": "127.0.0.1",
"__tag__:__receive_time__": "1706531737",
"__time__": "1706531727",
"__topic__": "slb_layer7",
"body_bytes_sent": "3577",
"client_ip": "114.137.195.189",
"host": "www.pi.mock.com",
"http_host": "www.cwj.mock.com",
"http_user_agent": "Mozilla/5.0 (Windows NT 6.2; rv:22.0) Gecko/20130405 Firefox/23.0",
"request_length": "1662",
"request_method": "GET",
"request_time": "31",
"request_uri": "/request/path-0/file-3",
"scheme": "https",
"slbid": "slb-02",
"status": "200",
"upstream_addr": "42.63.187.102",
"upstream_response_time": "32",
"upstream_status": "200",
"vip_addr": "223.18.47.239"
}Logstore中slbid字段有两种值:slb-01和slb-02,对15分钟的日志数据进行slbid统计,可以发现slb-01与slb-02数量相当。

行过滤场景
在数据处理中过滤数据是一种常见需求,在Flink中可以使用filter算子或者SQL中的where条件进行过滤,使用非常方便;但是在Flink使用filter算子,往往意味着数据已经通过网络进入Flink计算引擎中,全量的数据会消耗着网络带宽和Flink的计算性能,这种场景下,SLS SPL为Flink SLS Connector提供了一种支持过滤“下推”的能力,通过配置SLS Connector的query语句中,过滤条件,即可实现过滤条件下推。避免全量数据传输和全量数据过滤计算。

创建SQL作业
在阿里云Flink控制台创建一个空白的SQL的流作业草稿,点击下一步,进入作业编写

在作业草稿中输入如下创建临时表的语句:
CREATE TEMPORARY TABLE sls_input(
request_uri STRING,
scheme STRING,
slbid STRING,
status STRING,
`__topic__` STRING METADATA VIRTUAL,
`__source__` STRING METADATA VIRTUAL,
`__timestamp__` STRING METADATA VIRTUAL,
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
proctime as PROCTIME()
) WITH (
'connector' = 'sls',
'endpoint' ='cn-beijing-intranet.log.aliyuncs.com',
'accessId' = '${ak}',
'accessKey' = '${sk}',
'starttime' = '2024-01-21 00:00:00',
'project' ='${project}',
'logstore' ='test-nginx-log',
'query' = '* | where slbid = ''slb-01'''
);- 这里为了演示方便,仅设置request_uri、scheme、slbid、status和一些元数据字段作为表字段。
- ${ak}、${sk}、${project}替换为具有Logstore消费权限的账号。
- endpoint:填写同地域的SLS的私网地址。
- query:填写SLS的SPL语句,这里填写了SPL的过滤语句:* | where slbid = ''slb-01'',注意在阿里云Flink的SQL作业开发中,字符串需要使用英文单引号进行转义。
连续查询及效果
在作业中输入分析语句,按照slbid进行聚合查询,动态查询会根据日志的变化,实时刷新数字
SELECT slbid, count(1) as slb_cnt FROM sls_input GROUP BY slbid
点击右上角调试按钮,进行调试,可以看到结果中 slbid的字段值,始终是slb-01。

可以看出设置了SPL语句后,sls_input仅包含slbid=‘slb-01’的数据,其他不符合条件的数据被过滤掉了。
流量对比
使用SPL后,可以看出在SLS的写流量不变的情况下,Flink对SLS的读流量有大幅度下降;同时在过滤占主要很多Flink CU的场景下,经过过滤后,Flink CU也会有相应的降低。

列裁剪场景
在数据处理中列裁剪也是一种常见需求,在原始数据中,往往会有全量的字段,但是实际的计算只需要特定的字段;类似需要在Flink中可以使用project算子或者SQL中的select进行列裁剪与变换,使用Flink使用project算子,往往意味着数据已经通过网络进入Flink计算引擎中,全量的数据会消耗着网络带宽和Flink的计算性能,这种场景下,SLS SPL为Flink SLS Connector提供了一种支持投影下推的能力,通过配置SLS Connector的query参数,即可实现投影字段下推。避免全量数据传输和全量数据过滤计算。
创建SQL作业
创建步骤同行过滤场景,在作业草稿中输入如下创建临时表的语句,这里query参数配置进行了修改,在过滤的基础上增加了投影语句,可以实现从SLS服务端仅拉取特定字段的内容。
CREATE TEMPORARY TABLE sls_input(
request_uri STRING,
scheme STRING,
slbid STRING,
status STRING,
`__topic__` STRING METADATA VIRTUAL,
`__source__` STRING METADATA VIRTUAL,
`__timestamp__` STRING METADATA VIRTUAL,
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
proctime as PROCTIME()
) WITH (
'connector' = 'sls',
'endpoint' ='cn-beijing-intranet.log.aliyuncs.com',
'accessId' = '${ak}',
'accessKey' = '${sk}',
'starttime' = '2024-01-21 00:00:00',
'project' ='${project}',
'logstore' ='test-nginx-log',
'query' = '* | where slbid = ''slb-01'' | project request_uri, scheme, slbid, status, __topic__, __source__, "__tag__:__receive_time__"'
);
为了效果,下面分行展示语句中配置,在Flink语句中任然需要单行配置。
*
| where slbid = ''slb-01''
| project request_uri, scheme, slbid, status, __topic__, __source__, "__tag__:__receive_time__"上面使用了SLS SPL的管道式语法来实现数据过滤后投影的操作,类似Unix管道,使用|符号将不同指令进行分割,上一条指令的输出作为下一条指令的输入,最后的指令的输出表示整个管道的输出。
连续查询及效果

在作业中输入分析语句,可以看到,结果与行过滤场景结果类似。
SELECT slbid, count(1) as slb_cnt FROM sls_input_project GROUP BY slbid
注意:这里与行过滤不同的是,上面的行过滤场景会返回全量的字段,而当前的语句令SLS Connector只返回特定的字段,再次减少了数据的网络传输。
SPL还可以做什么
- 上述实例中演示了使用SLS SPL的过滤和投影功能来实现SLS Connector的“下推”功能,可以有效地减少网络流量和Flink CU的使用。可以避免在Flink进行计算之前,进行额外的过滤和投影计算消耗。
- SLS SPL的功能不止于过滤与投影,SLS SPL 完整支持的语法可以参考文档:SPL指令。同时,SPL管道式语法已全面支持在Flink Connector中进行配置。
- SLS SPL支持对于数据进行预处理,比如正则字段、JSON字段,CSV字段展开;数据格式转换,列的增加和减少;过滤等。除了用于消费场景,在SLS的Scan模式与采集端都会应用场景,以便用户在采集端、消费端都可以使用SPL的能力。