Flink Sql解析流程
本文主要简单聊一下flink sql如何解析,并将对应的filter下发到TableSource中SupportsFilterPushDown.applyFilters接口。最后在贴一下如何使用Flink相关Api来解析sql的示例。
FlinkSql解析流程
在此处我们主要SELECT * FROM T WHERE f < 111.321 以解析此句查询paimon表的Sql为主Flink版本1.20
1.Sql->SqlNode
通过Calcite将Sql语句转换成SqlNode。
ParserImpl.class
public List<Operation> parse(String statement) {
CalciteParser parser = calciteParserSupplier.get();
FlinkPlannerImpl planner = validatorSupplier.get();
Optional<Operation> command = EXTENDED_PARSER.parse(statement);
if (command.isPresent()) {
return Collections.singletonList(command.get());
}
// 在此处内部实现将sql语句转换成SqlNode
SqlNodeList sqlNodeList = parser.parseSqlList(statement);
List<SqlNode> parsed = sqlNodeList.getList();
Preconditions.checkArgument(parsed.size() == 1, "only single statement supported");
return Collections.singletonList(
SqlNodeToOperationConversion.convert(planner, catalogManager, parsed.get(0))
.orElseThrow(() -> new TableException("Unsupported query: " + statement)));
}
2.SqlNode ->SqlNode
使用FlinkPlannerImpl类对SqlNode进行validate
SqlNodeToOperationConversion.class
public static Optional<Operation> convert(
FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode sqlNode) {
// validate the query
final SqlNode validated = flinkPlanner.validate(sqlNode);
return convertValidatedSqlNode(flinkPlanner, catalogManager, validated);
}
3.SqlNode->Operation(relNode)
注意:查询相关语句会从sqlNode转换成的Operation里面有relNode,DDL之类的语句是没有relNode的,例如CreateTableOperation里面含有flink的CatalogTable。
将SqlNode转换成relNode并且包在Operation里面。
org.apache.flink.table.planner.operations.converters.SqlNodeConverters#convertSqlNode
public static Optional<Operation> convertSqlNode(
SqlNode validatedSqlNode, ConvertContext context) {
SqlNodeConverter classConverter = CLASS_CONVERTERS.get(validatedSqlNode.getClass());
if (classConverter != null) {
return Optional.of(classConverter.convertSqlNode(validatedSqlNode, context));
}
SqlNodeConverter sqlKindConverter = SQLKIND_CONVERTERS.get(validatedSqlNode.getKind());
if (sqlKindConverter != null) {
return Optional.of(sqlKindConverter.convertSqlNode(validatedSqlNode, context));
} else {
return Optional.empty();
}
}
执行转换的是org.apache.flink.table.planner.operations.converters.SqlNodeConverter#convertSqlNode,它有多种实现类
public class SqlQueryConverter implements SqlNodeConverter<SqlNode> {
@Override
public Operation convertSqlNode(SqlNode node, ConvertContext context) {
//此处内部实现其实是使用FlinkPlannerImpl.rel(node)
RelRoot relational = context.toRelRoot(node);
return new PlannerQueryOperation(relational.project());
}
}
4.Operation(RelNode)->RelNode->Transformation
这是最核心的地方,主要是对RelNode进行Optimize。
1.首先会将Operation里面的RelNode(LogicalProject)转换成RelNode(LogicalSink)
2.执行optimize,就是在此处将SupportsFilterPushDown.applyFilters将过滤的逻辑下推到TableSource,优化时会执行一些优化规则,org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoTableSourceScanRule
就是将Pushdown下推到Table的优化规则。
override def translate(
modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
//将modifyOperations里面的LogicalProject对象转换成Flink定义的LogicalSink
val relNodes = modifyOperations.map(translateToRel)
//优化
val optimizedRelNodes = optimize(relNodes)
//生成执行图
val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = false)
//生成DAG
val transformations = translateToPlan(execGraph)
afterTranslation()
transformations
}
下面我们打印一下堆栈信息看一下优化后的optimizedRelNodes是不是含有filter信息。我在这是debug的查询paimon表,因此可以看红线部分已经将filter信息转换成了paimon表的predicate。
3.之后就是生成执行图,生成transform DAG了
解析SQL代码示例
public static void main(String[] args) {
TableEnvironmentImpl tableEnvironmentInternal = (TableEnvironmentImpl) TableEnvironment.
create(EnvironmentSettings.newInstance().inBatchMode().build());
List<Operation> parsedOperations = tableEnvironmentInternal.getParser().parse("select ");
Operation operation = parsedOperations.get(0);
if(operation instanceof CreateTableOperation) {
CreateTableOperation createTableOperation = (CreateTableOperation) operation;
//create table Operation内部可以获取到CatalogTable,可以用于对table校验
CatalogTable catalogTable = createTableOperation.getCatalogTable();
Map<String, String> options = catalogTable.getOptions();
Schema unresolvedSchema = catalogTable.getUnresolvedSchema();
} else if (operation instanceof ModifyOperation) {
ModifyOperation modifyOperation = (ModifyOperation) operation;
PlannerQueryOperation child = (PlannerQueryOperation)modifyOperation.getChild();
RelNode calciteTree = child.getCalciteTree();
//不断递归input可最终获取到Table
getTableScanTable(calciteTree);
}
}
public void getTableScanTable(RelNode relNode) {
if(relNode instanceof TableScan) {
TableSourceTable table = (TableSourceTable)((TableScan) relNode).getTable();
ContextResolvedTable resolvedTable = table.contextResolvedTable();
ObjectIdentifier identifier = resolvedTable.getIdentifier();
ResolvedSchema resolvedSchema = resolvedTable.getResolvedSchema();
}
relNode.getInputs().forEach(this::getTableScanTable);
}
评论已关闭