Paimon Flink Procedure

https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/dev/table/procedures/

Flink接口

Flink Procedure是依赖catalog的,实现自定义的Flink Procedure需要继承Procedure接口

public interface Procedure {
}

并且在自定义的catalog中需要实现两个方法


default List<String> listProcedures(String dbName) throws DatabaseNotExistException, CatalogException {
    throw new UnsupportedOperationException(String.format("listProcedures is not implemented for %s.", this.getClass()));
}
default Procedure getProcedure(ObjectPath procedurePath) throws ProcedureNotExistException, CatalogException {
        throw new UnsupportedOperationException(String.format("getProcedure is not implemented for %s.", this.getClass()));
    }

listProcedures用来返回所有用户自定义实现的存储过程的name,getProcedure方法传入的procedurePath中记录着对应的存储过程name,需要根据不同的name返回不同的存储过程。

Paimon实现

listProdures

public List<String> listProcedures(String dbName)
        throws DatabaseNotExistException, CatalogException {
    if (!databaseExists(dbName)) {
        throw new DatabaseNotExistException(name, dbName);
    }

    return ProcedureUtil.listProcedures();
}
public static List<String> listProcedures() {
    return Collections.unmodifiableList(
            FactoryUtil.discoverIdentifiers(
                    ProcedureBase.class.getClassLoader(), ProcedureBase.class));
}
public static <T extends Factory> List<String> discoverIdentifiers(
        ClassLoader classLoader, Class<T> factoryClass) {
    final List<Factory> factories = discoverFactories(classLoader);

    return factories.stream()
            .filter(f -> factoryClass.isAssignableFrom(f.getClass()))
            .map(Factory::identifier)
            .collect(Collectors.toList());
}

Paimon所有的Procedure都实现了抽象类ProcedureBase,抽象类ProcedureBase又实现了Flink的Procedure接口。通过Spi的方式将所有实现ProcedureBase的类都加载出来,并返回这些类对应的identifier(这就是对应的存储过程的name)。

org.apache.paimon.flink.procedure.CompactDatabaseProcedure
org.apache.paimon.flink.procedure.CompactProcedure
org.apache.paimon.flink.procedure.CreateTagProcedure
org.apache.paimon.flink.procedure.DeleteTagProcedure
org.apache.paimon.flink.procedure.CreateBranchProcedure
org.apache.paimon.flink.procedure.DeleteBranchProcedure
org.apache.paimon.flink.procedure.DropPartitionProcedure
org.apache.paimon.flink.procedure.MergeIntoProcedure
org.apache.paimon.flink.procedure.ResetConsumerProcedure
org.apache.paimon.flink.procedure.RollbackToProcedure
org.apache.paimon.flink.procedure.MigrateTableProcedure
org.apache.paimon.flink.procedure.MigrateFileProcedure
org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
org.apache.paimon.flink.procedure.QueryServiceProcedure
org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure

getProcedure

public Procedure getProcedure(ObjectPath procedurePath)
        throws ProcedureNotExistException, CatalogException {
    return ProcedureUtil.getProcedure(catalog, procedurePath)
            .orElseThrow(() -> new ProcedureNotExistException(name, procedurePath));
}
public static Optional<Procedure> getProcedure(Catalog catalog, ObjectPath procedurePath) {
    if (!Catalog.SYSTEM_DATABASE_NAME.equals(procedurePath.getDatabaseName())) {
        return Optional.empty();
    }
    try {
        ProcedureBase procedure =
                FactoryUtil.discoverFactory(
                                ProcedureBase.class.getClassLoader(),
                                ProcedureBase.class,
                                procedurePath.getObjectName()) //这就是传入的存储过程的名字
                        .withCatalog(catalog);
        return Optional.of(procedure);
    } catch (FactoryException e) {
        return Optional.empty();
    }
}
public static <T extends Factory> T discoverFactory(
        ClassLoader classLoader, Class<T> factoryClass, String identifier) {
    final List<Factory> factories = discoverFactories(classLoader);

    final List<Factory> foundFactories =
            factories.stream()
                    .filter(f -> factoryClass.isAssignableFrom(f.getClass()))
                    .collect(Collectors.toList());

    if (foundFactories.isEmpty()) {
        throw new FactoryException(
                String.format(
                        "Could not find any factories that implement '%s' in the classpath.",
                        factoryClass.getName()));
    }

    final List<Factory> matchingFactories =
            foundFactories.stream()
                    .filter(f -> f.identifier().equals(identifier))
                    .collect(Collectors.toList());

   ......

    return (T) matchingFactories.get(0);
}

可以看到getProcedure也是通过spi方式将所有的类进行加载,并根据传入的存储过程的name返回对应的实现类。

标签: none

评论已关闭