在Flink控制台创建SQL流作业,通过CREATE TEMPORARY TABLE语句定义源表时,在WITH子句中配置SPL。关键步骤:首先在TABLE定义中声明清洗后的目标字段(如errorCode、fileName等),字段类型通常为STRING。然后在WITH子句中设置connector为sls,配置endpoint、accessId、accessKey、project和logstore等连接参数。核心是query参数,将SPL语句写入其中。例如:
'query' = '* | project Payload, error, caller | parse-json Payload | parse-regexp error, ''pattern'' as errorJson | ...'
注意在Flink控制台中需要对SPL中的单引号使用双单引号转义,并消除换行符。SPL最终输出的字段列表必须与TABLE中定义的字段一一对应。配置完成后,Flink收到的数据已经是清洗规整后的结构化数据,可以直接用SELECT语句进行业务分析。