本文主要简单聊一下flink sql如何解析,并将对应的filter下发到TableSource中SupportsFilterPushDown.applyFilters接口。最后在贴一下如何使用Flink相关Api来解析sql的示例。

FlinkSql解析流程

在此处我们主要SELECT * FROM T WHERE f < 111.321 以解析此句查询paimon表的Sql为主Flink版本1.20

image-20241230005052100.png

1.Sql->SqlNode

通过Calcite将Sql语句转换成SqlNode。

ParserImpl.class
public List<Operation> parse(String statement) {
        CalciteParser parser = calciteParserSupplier.get();
        FlinkPlannerImpl planner = validatorSupplier.get();

        Optional<Operation> command = EXTENDED_PARSER.parse(statement);
        if (command.isPresent()) {
            return Collections.singletonList(command.get());
        }

        // 在此处内部实现将sql语句转换成SqlNode
        SqlNodeList sqlNodeList = parser.parseSqlList(statement);
        List<SqlNode> parsed = sqlNodeList.getList();
        Preconditions.checkArgument(parsed.size() == 1, "only single statement supported");
        return Collections.singletonList(
                SqlNodeToOperationConversion.convert(planner, catalogManager, parsed.get(0))
                        .orElseThrow(() -> new TableException("Unsupported query: " + statement)));
    }

2.SqlNode ->SqlNode

使用FlinkPlannerImpl类对SqlNode进行validate

SqlNodeToOperationConversion.class
public static Optional<Operation> convert(
            FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode sqlNode) {
        // validate the query
        final SqlNode validated = flinkPlanner.validate(sqlNode);
        return convertValidatedSqlNode(flinkPlanner, catalogManager, validated);
    }

3.SqlNode->Operation(relNode)

注意:查询相关语句会从sqlNode转换成的Operation里面有relNode,DDL之类的语句是没有relNode的,例如CreateTableOperation里面含有flink的CatalogTable。

将SqlNode转换成relNode并且包在Operation里面。

org.apache.flink.table.planner.operations.converters.SqlNodeConverters#convertSqlNode
public static Optional<Operation> convertSqlNode(
            SqlNode validatedSqlNode, ConvertContext context) {
        SqlNodeConverter classConverter = CLASS_CONVERTERS.get(validatedSqlNode.getClass());
        if (classConverter != null) {
            return Optional.of(classConverter.convertSqlNode(validatedSqlNode, context));
        }

        SqlNodeConverter sqlKindConverter = SQLKIND_CONVERTERS.get(validatedSqlNode.getKind());
        if (sqlKindConverter != null) {
            return Optional.of(sqlKindConverter.convertSqlNode(validatedSqlNode, context));
        } else {
            return Optional.empty();
        }
    }

执行转换的是org.apache.flink.table.planner.operations.converters.SqlNodeConverter#convertSqlNode,它有多种实现类

public class SqlQueryConverter implements SqlNodeConverter<SqlNode> {
    @Override
    public Operation convertSqlNode(SqlNode node, ConvertContext context) {
         //此处内部实现其实是使用FlinkPlannerImpl.rel(node)
        RelRoot relational = context.toRelRoot(node);
        return new PlannerQueryOperation(relational.project());
    }
}

4.Operation(RelNode)->RelNode->Transformation

这是最核心的地方,主要是对RelNode进行Optimize。

1.首先会将Operation里面的RelNode(LogicalProject)转换成RelNode(LogicalSink)

2.执行optimize,就是在此处将SupportsFilterPushDown.applyFilters将过滤的逻辑下推到TableSource,优化时会执行一些优化规则,org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoTableSourceScanRule就是将Pushdown下推到Table的优化规则。

override def translate(
      modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
//将modifyOperations里面的LogicalProject对象转换成Flink定义的LogicalSink
    val relNodes = modifyOperations.map(translateToRel) 
//优化
    val optimizedRelNodes = optimize(relNodes)
   //生成执行图
    val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = false)
    //生成DAG
    val transformations = translateToPlan(execGraph)
    afterTranslation()
    transformations
  }

下面我们打印一下堆栈信息看一下优化后的optimizedRelNodes是不是含有filter信息。我在这是debug的查询paimon表,因此可以看红线部分已经将filter信息转换成了paimon表的predicate。

image-20241229234804492.png

3.之后就是生成执行图,生成transform DAG了

解析SQL代码示例

public static void main(String[] args) {
        TableEnvironmentImpl tableEnvironmentInternal = (TableEnvironmentImpl) TableEnvironment.
                create(EnvironmentSettings.newInstance().inBatchMode().build());
        List<Operation> parsedOperations = tableEnvironmentInternal.getParser().parse("select ");
        Operation operation = parsedOperations.get(0);
        if(operation instanceof CreateTableOperation) {
            CreateTableOperation createTableOperation = (CreateTableOperation) operation;
            //create table Operation内部可以获取到CatalogTable,可以用于对table校验
            CatalogTable catalogTable = createTableOperation.getCatalogTable();
            Map<String, String> options = catalogTable.getOptions();
            Schema unresolvedSchema = catalogTable.getUnresolvedSchema();
        } else if (operation instanceof ModifyOperation) {
            ModifyOperation modifyOperation =  (ModifyOperation) operation;
            PlannerQueryOperation child = (PlannerQueryOperation)modifyOperation.getChild();
            RelNode calciteTree = child.getCalciteTree();
            //不断递归input可最终获取到Table
            getTableScanTable(calciteTree);
        }
    }
    public void getTableScanTable(RelNode relNode) {
        if(relNode instanceof TableScan) {
            TableSourceTable table = (TableSourceTable)((TableScan) relNode).getTable();
            ContextResolvedTable resolvedTable = table.contextResolvedTable();
            ObjectIdentifier identifier = resolvedTable.getIdentifier();
            ResolvedSchema resolvedSchema = resolvedTable.getResolvedSchema();
        }
        relNode.getInputs().forEach(this::getTableScanTable);
    }

标签: none

评论已关闭