Apache Paimon-procedure实现
Paimon Flink Procedure
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/dev/table/procedures/
Flink接口
Flink Procedure是依赖catalog的,实现自定义的Flink Procedure需要继承Procedure接口
public interface Procedure {
}
并且在自定义的catalog中需要实现两个方法
default List<String> listProcedures(String dbName) throws DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException(String.format("listProcedures is not implemented for %s.", this.getClass()));
}
default Procedure getProcedure(ObjectPath procedurePath) throws ProcedureNotExistException, CatalogException {
throw new UnsupportedOperationException(String.format("getProcedure is not implemented for %s.", this.getClass()));
}
listProcedures用来返回所有用户自定义实现的存储过程的name,getProcedure方法传入的procedurePath中记录着对应的存储过程name,需要根据不同的name返回不同的存储过程。
Paimon实现
listProdures
public List<String> listProcedures(String dbName)
throws DatabaseNotExistException, CatalogException {
if (!databaseExists(dbName)) {
throw new DatabaseNotExistException(name, dbName);
}
return ProcedureUtil.listProcedures();
}
public static List<String> listProcedures() {
return Collections.unmodifiableList(
FactoryUtil.discoverIdentifiers(
ProcedureBase.class.getClassLoader(), ProcedureBase.class));
}
public static <T extends Factory> List<String> discoverIdentifiers(
ClassLoader classLoader, Class<T> factoryClass) {
final List<Factory> factories = discoverFactories(classLoader);
return factories.stream()
.filter(f -> factoryClass.isAssignableFrom(f.getClass()))
.map(Factory::identifier)
.collect(Collectors.toList());
}
Paimon所有的Procedure都实现了抽象类ProcedureBase,抽象类ProcedureBase又实现了Flink的Procedure接口。通过Spi的方式将所有实现ProcedureBase的类都加载出来,并返回这些类对应的identifier(这就是对应的存储过程的name)。
org.apache.paimon.flink.procedure.CompactDatabaseProcedure
org.apache.paimon.flink.procedure.CompactProcedure
org.apache.paimon.flink.procedure.CreateTagProcedure
org.apache.paimon.flink.procedure.DeleteTagProcedure
org.apache.paimon.flink.procedure.CreateBranchProcedure
org.apache.paimon.flink.procedure.DeleteBranchProcedure
org.apache.paimon.flink.procedure.DropPartitionProcedure
org.apache.paimon.flink.procedure.MergeIntoProcedure
org.apache.paimon.flink.procedure.ResetConsumerProcedure
org.apache.paimon.flink.procedure.RollbackToProcedure
org.apache.paimon.flink.procedure.MigrateTableProcedure
org.apache.paimon.flink.procedure.MigrateFileProcedure
org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
org.apache.paimon.flink.procedure.QueryServiceProcedure
org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
getProcedure
public Procedure getProcedure(ObjectPath procedurePath)
throws ProcedureNotExistException, CatalogException {
return ProcedureUtil.getProcedure(catalog, procedurePath)
.orElseThrow(() -> new ProcedureNotExistException(name, procedurePath));
}
public static Optional<Procedure> getProcedure(Catalog catalog, ObjectPath procedurePath) {
if (!Catalog.SYSTEM_DATABASE_NAME.equals(procedurePath.getDatabaseName())) {
return Optional.empty();
}
try {
ProcedureBase procedure =
FactoryUtil.discoverFactory(
ProcedureBase.class.getClassLoader(),
ProcedureBase.class,
procedurePath.getObjectName()) //这就是传入的存储过程的名字
.withCatalog(catalog);
return Optional.of(procedure);
} catch (FactoryException e) {
return Optional.empty();
}
}
public static <T extends Factory> T discoverFactory(
ClassLoader classLoader, Class<T> factoryClass, String identifier) {
final List<Factory> factories = discoverFactories(classLoader);
final List<Factory> foundFactories =
factories.stream()
.filter(f -> factoryClass.isAssignableFrom(f.getClass()))
.collect(Collectors.toList());
if (foundFactories.isEmpty()) {
throw new FactoryException(
String.format(
"Could not find any factories that implement '%s' in the classpath.",
factoryClass.getName()));
}
final List<Factory> matchingFactories =
foundFactories.stream()
.filter(f -> f.identifier().equals(identifier))
.collect(Collectors.toList());
......
return (T) matchingFactories.get(0);
}
可以看到getProcedure也是通过spi方式将所有的类进行加载,并根据传入的存储过程的name返回对应的实现类。
评论已关闭