Apache Paimon-flinkconnector实现
Flink Paimon connector 全解
Flink连接Paimon,需要实现Flink对应的Source 与Sink接口,以及对应的Catalog接口才可以实现。
Source接口
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sourcessinks/
Flink如何想要操作Paimon需要实现DynamicTableSourceFactory
public interface DynamicTableSourceFactory extends DynamicTableFactory {
DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context var1);
}
此接口目的是为了创建DynamicTableSource,用户可以自定义的Source分为两种,一种是ScanTableSource,这种Source就是正常的Source用来从表中读取数据;另外一种Source是LookupTableSource,这种Source主要是为了做Lookupjoin进行关联的Table,如果用户想要在FLinksql 中进行lookupjoin Paimon表就需要实现这种Source。
ScanTableSource
(Flink接口)
public interface ScanTableSource extends DynamicTableSource {
ChangelogMode getChangelogMode();
ScanRuntimeProvider getScanRuntimeProvider(ScanContext var1);
@PublicEvolving
public interface ScanRuntimeProvider {
boolean isBounded();
}
@PublicEvolving
public interface ScanContext extends DynamicTableSource.Context {
}
}
此接口我们实现时候主要是为了实现一个ScanRuntimeProvider
Paimon实现此接口
public abstract class BaseTableSource implements ScanTableSource {
private final FlinkTableSource source; //paimon使用BaseTableSource将具体实现FlinkTableSource 包装了起来。
public BaseTableSource(FlinkTableSource source) {
this.source = source;
}
@Override
public ChangelogMode getChangelogMode() {
return source.getChangelogMode();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
return source.getScanRuntimeProvider(runtimeProviderContext);
}
@Override
public String asSummaryString() {
return source.asSummaryString();
}
}
FlinkTableSource(Paimon)
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
LogSourceProvider logSourceProvider = null;
if (logStoreTableFactory != null) {
logSourceProvider =
logStoreTableFactory.createSourceProvider(context, scanContext, projectFields);
}
WatermarkStrategy<RowData> watermarkStrategy = this.watermarkStrategy;
Options options = Options.fromMap(table.options());
if (watermarkStrategy != null) {
WatermarkEmitStrategy emitStrategy = options.get(SCAN_WATERMARK_EMIT_STRATEGY);
if (emitStrategy == WatermarkEmitStrategy.ON_EVENT) {
watermarkStrategy = new OnEventWatermarkStrategy(watermarkStrategy);
}
Duration idleTimeout = options.get(SCAN_WATERMARK_IDLE_TIMEOUT);
if (idleTimeout != null) {
watermarkStrategy = watermarkStrategy.withIdleness(idleTimeout);
}
String watermarkAlignGroup = options.get(SCAN_WATERMARK_ALIGNMENT_GROUP);
if (watermarkAlignGroup != null) {
try {
watermarkStrategy =
WatermarkAlignUtils.withWatermarkAlignment(
watermarkStrategy,
watermarkAlignGroup,
options.get(SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT),
options.get(SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL));
} catch (NoSuchMethodError error) {
throw new RuntimeException(
"Flink 1.14 does not support watermark alignment, please check your Flink version.",
error);
}
}
}
FlinkSourceBuilder sourceBuilder =
new FlinkSourceBuilder(tableIdentifier, table)
.withContinuousMode(streaming)
.withLogSourceProvider(logSourceProvider)
.withProjection(projectFields)
.withPredicate(predicate)
.withLimit(limit)
.withWatermarkStrategy(watermarkStrategy)
.withDynamicPartitionFilteringFields(dynamicPartitionFilteringFields);
return new PaimonDataStreamScanProvider(
!streaming, env -> configureSource(sourceBuilder, env));
}
private DataStream<RowData> configureSource(
FlinkSourceBuilder sourceBuilder, StreamExecutionEnvironment env) {
Options options = Options.fromMap(this.table.options());
Configuration envConfig = (Configuration) env.getConfiguration();
if (envConfig.containsKey(FLINK_INFER_SCAN_PARALLELISM)) {
options.set(
FlinkConnectorOptions.INFER_SCAN_PARALLELISM,
Boolean.parseBoolean(envConfig.toMap().get(FLINK_INFER_SCAN_PARALLELISM)));
}
Integer parallelism = options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
if (parallelism == null && options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) {
if (streaming) {
parallelism = options.get(CoreOptions.BUCKET);
} else {
scanSplitsForInference();
parallelism = splitStatistics.splitNumber();
if (null != limit && limit > 0) {
int limitCount =
limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE : limit.intValue();
parallelism = Math.min(parallelism, limitCount);
}
parallelism = Math.max(1, parallelism);
}
parallelism =
Math.min(
parallelism,
options.get(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM));
}
return sourceBuilder.withParallelism(parallelism).withEnv(env).build();
}
从上图可以看到Paimon实现了一个PaimonDataStreamScanProvider的类此类是继承Flink的DataStreamScanProvider接口
public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider {
default DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
return this.produceDataStream(execEnv);
}
/** @deprecated */
@Deprecated
default DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
throw new UnsupportedOperationException("This method is deprecated. Use produceDataStream(ProviderContext, StreamExecutionEnvironment) instead");
}
}
实现此接口方法需要传入一个env之后返回一个DataStream就可实现链接到Flink。
Paimon创建了一个FlinkSourceBuilder用来构建这个DataStream。
FlinkSourceBuilder(Paimon)
public DataStream<RowData> build() {
if (env == null) {
throw new IllegalArgumentException("StreamExecutionEnvironment should not be null.");
}
if (isContinuous) {
TableScanUtils.streamingReadingValidate(table);
// TODO visit all options through CoreOptions
StartupMode startupMode = CoreOptions.startupMode(conf);
StreamingReadMode streamingReadMode = CoreOptions.streamReadType(conf);
if (logSourceProvider != null && streamingReadMode != FILE) {
if (startupMode != StartupMode.LATEST_FULL) {
return toDataStream(logSourceProvider.createSource(null));
} else {
return toDataStream(
HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder(
LogHybridSourceFactory.buildHybridFirstSource(
table, projectedFields, predicate))
.addSource(
new LogHybridSourceFactory(logSourceProvider),
Boundedness.CONTINUOUS_UNBOUNDED)
.build());
}
} else {
if (conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) {
return buildAlignedContinuousFileSource();
} else if (conf.contains(CoreOptions.CONSUMER_ID)
&& conf.get(CoreOptions.CONSUMER_CONSISTENCY_MODE)
== CoreOptions.ConsumerMode.EXACTLY_ONCE) {
return buildContinuousStreamOperator();
} else {
return buildContinuousFileSource();
}
}
} else {
return buildStaticFileSource();
}
}
可以看到Paimon根据不同的参数配置创建了不同的DataStream,我们在这只看最基础的流读的 Source,buildContinuousFileSource。
private DataStream<RowData> buildContinuousFileSource() {
return toDataStream(
new ContinuousFileStoreSource(
createReadBuilder(),
table.options(),
limit,
table instanceof FileStoreTable
? ((FileStoreTable) table).bucketMode()
: BucketMode.FIXED));
}
private DataStream<RowData> toDataStream(Source<RowData, ?, ?> source) {
DataStreamSource<RowData> dataStream =
env.fromSource(
source,
watermarkStrategy == null
? WatermarkStrategy.noWatermarks()
: watermarkStrategy,
tableIdentifier.asSummaryString(),
produceTypeInfo());
if (parallelism != null) {
dataStream.setParallelism(parallelism);
}
return dataStream;
}
通过上方代码我们可以看到Paimon创建了一个ContinuousFileStoreSource
ContinuousFileStoreSource继承自FlinkSource,FlinkSource又继承了Flink的Source接口
public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends SourceReaderFactory<T, SplitT> {
Boundedness getBoundedness();
SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> var1) throws Exception;
SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(SplitEnumeratorContext<SplitT> var1, EnumChkT var2) throws Exception;
SimpleVersionedSerializer<SplitT> getSplitSerializer();
SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
}
Source包含两个主要的组件:
- SplitEnumerator:发现和指派split(split可以为文件,分区等)。
- Reader:负责从split中读取真实数据。
Paimon实现FileStoreSourceReader
public class FileStoreSourceReader
extends SingleThreadMultiplexSourceReaderBase<
RecordIterator<RowData>, RowData, FileStoreSourceSplit, FileStoreSourceSplitState> {
private final IOManager ioManager;
private long lastConsumeSnapshotId = Long.MIN_VALUE;
public FileStoreSourceReader(
SourceReaderContext readerContext,
TableRead tableRead,
FileStoreSourceReaderMetrics metrics,
IOManager ioManager,
@Nullable Long limit) {
// limiter is created in SourceReader, it can be shared in all split readers
super(
() ->
new FileStoreSourceSplitReader(
tableRead, RecordLimiter.create(limit), metrics),
(element, output, state) ->
FlinkRecordsWithSplitIds.emitRecord(element, output, state, metrics),
readerContext.getConfiguration(),
readerContext);
this.ioManager = ioManager;
}
public FileStoreSourceReader(
SourceReaderContext readerContext,
TableRead tableRead,
FileStoreSourceReaderMetrics metrics,
IOManager ioManager,
@Nullable Long limit,
FutureCompletingBlockingQueue<RecordsWithSplitIds<RecordIterator<RowData>>>
elementsQueue) {
super(
elementsQueue,
() ->
new FileStoreSourceSplitReader(
tableRead, RecordLimiter.create(limit), metrics),
(element, output, state) ->
FlinkRecordsWithSplitIds.emitRecord(element, output, state, metrics),
readerContext.getConfiguration(),
readerContext);
this.ioManager = ioManager;
}
@Override
public void start() {
// we request a split only if we did not get splits during the checkpoint restore
if (getNumberOfCurrentlyAssignedSplits() == 0) {
context.sendSplitRequest();
}
}
@Override
protected FileStoreSourceSplitState initializedState(FileStoreSourceSplit split) {
return new FileStoreSourceSplitState(split);
}
@Override
protected FileStoreSourceSplit toSplitType(
String splitId, FileStoreSourceSplitState splitState) {
return splitState.toSourceSplit();
}
@Override
public void close() throws Exception {
super.close();
ioManager.close();
}
}
以及ContinuousFileSplitEnumerator类。
public class ContinuousFileSplitEnumerator
implements SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> {
}
以上为Flink链接Paimon Scan Source的代码实现
LookupTableSource
lookupTableSource主要是为了做lookupjoin table来实现的接口,在运行期间,LookupTableSource
接口会在外部存储系统中按照 key 进行查找。
LookupTableSource(Flink)
public interface LookupTableSource extends DynamicTableSource {
LookupRuntimeProvider getLookupRuntimeProvider(LookupContext var1);
@PublicEvolving
public interface LookupRuntimeProvider {
}
@PublicEvolving
public interface LookupContext extends DynamicTableSource.Context {
int[][] getKeys();
}
}
Paimon的DataTableSource实现了此接口
DataTableSource
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
if (limit != null) {
throw new RuntimeException(
"Limit push down should not happen in Lookup source, but it is " + limit);
}
int[] projection =
projectFields == null
? IntStream.range(0, table.rowType().getFieldCount()).toArray()
: Projection.of(projectFields).toTopLevelIndexes();
int[] joinKey = Projection.of(context.getKeys()).toTopLevelIndexes();
Options options = new Options(table.options());
boolean enableAsync = options.get(LOOKUP_ASYNC);
int asyncThreadNumber = options.get(LOOKUP_ASYNC_THREAD_NUMBER);
return LookupRuntimeProviderFactory.create(
new FileStoreLookupFunction(table, projection, joinKey, predicate),
enableAsync,
asyncThreadNumber);
}
Flink 接口
public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {
static LookupFunctionProvider of(LookupFunction lookupFunction) {
return () -> {
return lookupFunction;
};
}
LookupFunction createLookupFunction();
}
在实现LookupFunctionProvider接口内部的一个LookupFunction接口主要逻辑都在lookupFunction里面
LookupFunction(flink)
public abstract class LookupFunction extends TableFunction<RowData> {
public LookupFunction() {
}
//此处为需要实现的主要关联逻辑
public abstract Collection<RowData> lookup(RowData var1) throws IOException;
public final void eval(Object... keys) {
GenericRowData keyRow = GenericRowData.of(keys);
try {
Collection<RowData> lookup = this.lookup(keyRow);
if (lookup != null) {
lookup.forEach(this::collect);
}
} catch (IOException var4) {
throw new RuntimeException(String.format("Failed to lookup values with given key row '%s'", keyRow), var4);
}
}
}
Paimon对LookupFuntion的实现
public class NewLookupFunction extends LookupFunction {
private static final long serialVersionUID = 1L;
private final FileStoreLookupFunction function;
public NewLookupFunction(FileStoreLookupFunction function) {
this.function = function;
}
@Override
public void open(FunctionContext context) throws Exception {
function.open(context);
}
@Override
public Collection<RowData> lookup(RowData keyRow) throws IOException {
return function.lookup(keyRow);
}
@Override
public void close() throws Exception {
function.close();
}
}
可以看到主要逻辑都在FileStoreLookupFunction里面
public Collection<RowData> lookup(RowData keyRow) {
try {
checkRefresh();
List<InternalRow> results = lookupTable.get(new FlinkRowWrapper(keyRow));
List<RowData> rows = new ArrayList<>(results.size());
for (InternalRow matchedRow : results) {
rows.add(new FlinkRowData(matchedRow));
}
return rows;
} catch (OutOfRangeException e) {
reopen();
return lookup(keyRow);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
以上为Source的相关实现
Sink接口
paimon的Sink接口用户需要自己实现DynamicTableSink
public interface DynamicTableSink {
ChangelogMode getChangelogMode(ChangelogMode var1);
SinkRuntimeProvider getSinkRuntimeProvider(Context var1);
DynamicTableSink copy();
String asSummaryString();
@PublicEvolving
public interface SinkRuntimeProvider {
}
@PublicEvolving
public interface DataStructureConverter extends RuntimeConverter {
@Nullable
Object toExternal(@Nullable Object var1);
}
@PublicEvolving
public interface Context {
boolean isBounded();
<T> TypeInformation<T> createTypeInformation(DataType var1);
<T> TypeInformation<T> createTypeInformation(LogicalType var1);
DataStructureConverter createDataStructureConverter(DataType var1);
Optional<int[][]> getTargetColumns();
}
}
Paimon实现
public DynamicTableSink createDynamicTableSink(Context context) {
Table table = buildPaimonTable(context);
if (table instanceof FileStoreTable) {
storeTableLineage(
((FileStoreTable) table).catalogEnvironment().lineageMetaFactory(),
context,
(entity, lineageFactory) -> {
try (LineageMeta lineage =
lineageFactory.create(() -> Options.fromMap(table.options()))) {
lineage.saveSinkTableLineage(entity);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
return new FlinkTableSink(
context.getObjectIdentifier(),
table,
context,
createOptionalLogStoreFactory(context).orElse(null));
}
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
if (overwrite && !context.isBounded()) {
throw new UnsupportedOperationException(
"Paimon doesn't support streaming INSERT OVERWRITE.");
}
LogSinkProvider logSinkProvider = null;
if (logStoreTableFactory != null) {
logSinkProvider = logStoreTableFactory.createSinkProvider(this.context, context);
}
Options conf = Options.fromMap(table.options());
// Do not sink to log store when overwrite mode
final LogSinkFunction logSinkFunction =
overwrite ? null : (logSinkProvider == null ? null : logSinkProvider.createSink());
return new PaimonDataStreamSinkProvider(
(dataStream) ->
new FlinkSinkBuilder((FileStoreTable) table)
.withInput(
new DataStream<>(
dataStream.getExecutionEnvironment(),
dataStream.getTransformation()))
.withLogSinkFunction(logSinkFunction)
.withOverwritePartition(overwrite ? staticPartitions : null)
.withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
.withBoundedInputStream(context.isBounded())
.build());
}
跟Source类似,paimon需要实现一个SinkRuntimeProvider,主要逻辑都在SinkRuntimeProvider里面
SinkRuntimeProvider Flink接口
public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
default DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
return this.consumeDataStream(dataStream);
}
/** @deprecated */
@Deprecated
default DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
throw new UnsupportedOperationException("This method is deprecated. Use consumeDataStream(ProviderContext, DataStream<RowData>) instead");
}
default Optional<Integer> getParallelism() {
return Optional.empty();
}
}
可以看到实现此接口需要传入一个DataStream,之后返回一个DataStreamSink就可以,也就是说Paimon可以根据传入的DataStream接着在后面进一步做操作。
public class PaimonDataStreamSinkProvider implements DataStreamSinkProvider {
private final Function<DataStream<RowData>, DataStreamSink<?>> producer;
public PaimonDataStreamSinkProvider(Function<DataStream<RowData>, DataStreamSink<?>> producer) {
this.producer = producer;
}
@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext, DataStream<RowData> dataStream) {
return producer.apply(dataStream);
}
}
主要逻辑都在producer里面
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
if (overwrite && !context.isBounded()) {
throw new UnsupportedOperationException(
"Paimon doesn't support streaming INSERT OVERWRITE.");
}
LogSinkProvider logSinkProvider = null;
if (logStoreTableFactory != null) {
logSinkProvider = logStoreTableFactory.createSinkProvider(this.context, context);
}
Options conf = Options.fromMap(table.options());
// Do not sink to log store when overwrite mode
final LogSinkFunction logSinkFunction =
overwrite ? null : (logSinkProvider == null ? null : logSinkProvider.createSink());
return new PaimonDataStreamSinkProvider(
(dataStream) ->
new FlinkSinkBuilder((FileStoreTable) table)
.withInput(
new DataStream<>(
dataStream.getExecutionEnvironment(),
dataStream.getTransformation()))
.withLogSinkFunction(logSinkFunction)
.withOverwritePartition(overwrite ? staticPartitions : null)
.withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
.withBoundedInputStream(context.isBounded())
.build());
}
producer的主要逻辑都在FlinkSinkBuilder.build()里面
public DataStreamSink<?> build() {
DataStream<InternalRow> input = MapToInternalRow.map(this.input, table.rowType());
if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) {
input =
input.forward()
.transform(
"local merge",
input.getType(),
new LocalMergeOperator(table.schema()))
.setParallelism(input.getParallelism());
}
BucketMode bucketMode = table.bucketMode();
switch (bucketMode) {
case FIXED:
return buildForFixedBucket(input);
case DYNAMIC:
return buildDynamicBucketSink(input, false);
case GLOBAL_DYNAMIC:
return buildDynamicBucketSink(input, true);
case UNAWARE:
return buildUnawareBucketSink(input);
default:
throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode);
}
}
这里根部不同类型有不同的实现,我们这次看一下buildForFixedBucket这个方法。
private DataStreamSink<?> buildForFixedBucket(DataStream<InternalRow> input) {
DataStream<InternalRow> partitioned =
partition(
input,
new RowDataChannelComputer(table.schema(), logSinkFunction != null),
parallelism);
FixedBucketSink sink = new FixedBucketSink(table, overwritePartition, logSinkFunction);
return sink.sinkFrom(partitioned);
}
接着进入FixedBucketSink类里面
public DataStreamSink<?> sinkFrom(DataStream<T> input, String initialCommitUser) {
assertNoSinkMaterializer(input);
// do the actually writing action, no snapshot generated in this stage
DataStream<Committable> written = doWrite(input, initialCommitUser, input.getParallelism());
// commit the committable to generate a new snapshot
return doCommit(written, initialCommitUser);
}
可以看到paimon根据传入的input Stream,首先又接了一个Writer Datastream,之后又接了一个Commit dataStream。
public DataStream<Committable> doWrite(
DataStream<T> input, String commitUser, @Nullable Integer parallelism) {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
boolean isStreaming =
StreamExecutionEnvironmentUtils.getConfiguration(env)
.get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
boolean writeOnly = table.coreOptions().writeOnly();
SingleOutputStreamOperator<Committable> written =
input.transform(
(writeOnly ? WRITER_WRITE_ONLY_NAME : WRITER_NAME)
+ " : "
+ table.name(),
new CommittableTypeInfo(),
createWriteOperator(
createWriteProvider(env.getCheckpointConfig(), isStreaming),
commitUser))
.setParallelism(parallelism == null ? input.getParallelism() : parallelism);
if (!isStreaming) {
assertBatchConfiguration(env, written.getParallelism());
}
Options options = Options.fromMap(table.options());
if (options.get(SINK_USE_MANAGED_MEMORY)) {
declareManagedMemory(written, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
}
return written;
}
doWrite方法里面自定义了一个Operator用于向Paimon里面写bucket数据,以及做数据压缩等操作。
protected DataStreamSink<?> doCommit(DataStream<Committable> written, String commitUser) {
StreamExecutionEnvironment env = written.getExecutionEnvironment();
ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
boolean streamingCheckpointEnabled =
isStreaming && checkpointConfig.isCheckpointingEnabled();
if (streamingCheckpointEnabled) {
assertStreamingConfiguration(env);
}
OneInputStreamOperator<Committable, Committable> committerOperator =
new CommitterOperator<>(
streamingCheckpointEnabled,
commitUser,
createCommitterFactory(streamingCheckpointEnabled),
createCommittableStateManager());
if (Options.fromMap(table.options()).get(SINK_AUTO_TAG_FOR_SAVEPOINT)) {
committerOperator =
new AutoTagForSavepointCommitterOperator<>(
(CommitterOperator<Committable, ManifestCommittable>) committerOperator,
table::snapshotManager,
table::tagManager,
() -> table.store().newTagDeletion(),
() -> table.store().createTagCallbacks());
}
if (conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.BATCH
&& table.coreOptions().tagCreationMode() == TagCreationMode.BATCH) {
committerOperator =
new BatchWriteGeneratorTagOperator<>(
(CommitterOperator<Committable, ManifestCommittable>) committerOperator,
table);
}
SingleOutputStreamOperator<?> committed =
written.transform(
GLOBAL_COMMITTER_NAME + " : " + table.name(),
new CommittableTypeInfo(),
committerOperator)
.setParallelism(1)
.setMaxParallelism(1);
Options options = Options.fromMap(table.options());
configureGlobalCommitter(
committed,
options.get(SINK_COMMITTER_CPU),
options.get(SINK_COMMITTER_MEMORY),
conf);
return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}
Commit也是自定义实现了一个Operator用来提交元数据信息。最后返回了一个空的DiscardingSink做结束。
Catalog
Flink想要操作paimon的元数据需要自己实现catalog的接口
public interface Catalog {
//此接口主要是用来将catalog与table connect链接起来需要返回table的工厂类
default Optional<Factory> getFactory() {
return Optional.empty();
}
/** @deprecated */
@Deprecated
default Optional<TableFactory> getTableFactory() {
return Optional.empty();
}
default Optional<FunctionDefinitionFactory> getFunctionDefinitionFactory() {
return Optional.empty();
}
void open() throws CatalogException;
void close() throws CatalogException;
String getDefaultDatabase() throws CatalogException;
List<String> listDatabases() throws CatalogException;
CatalogDatabase getDatabase(String var1) throws DatabaseNotExistException, CatalogException;
boolean databaseExists(String var1) throws CatalogException;
void createDatabase(String var1, CatalogDatabase var2, boolean var3) throws DatabaseAlreadyExistException, CatalogException;
default void dropDatabase(String name, boolean ignoreIfNotExists) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
this.dropDatabase(name, ignoreIfNotExists, false);
}
void dropDatabase(String var1, boolean var2, boolean var3) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException;
void alterDatabase(String var1, CatalogDatabase var2, boolean var3) throws DatabaseNotExistException, CatalogException;
List<String> listTables(String var1) throws DatabaseNotExistException, CatalogException;
List<String> listViews(String var1) throws DatabaseNotExistException, CatalogException;
CatalogBaseTable getTable(ObjectPath var1) throws TableNotExistException, CatalogException;
boolean tableExists(ObjectPath var1) throws CatalogException;
void dropTable(ObjectPath var1, boolean var2) throws TableNotExistException, CatalogException;
void renameTable(ObjectPath var1, String var2, boolean var3) throws TableNotExistException, TableAlreadyExistException, CatalogException;
void createTable(ObjectPath var1, CatalogBaseTable var2, boolean var3) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException;
void alterTable(ObjectPath var1, CatalogBaseTable var2, boolean var3) throws TableNotExistException, CatalogException;
List<CatalogPartitionSpec> listPartitions(ObjectPath var1) throws TableNotExistException, TableNotPartitionedException, CatalogException;
List<CatalogPartitionSpec> listPartitions(ObjectPath var1, CatalogPartitionSpec var2) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException;
List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath var1, List<Expression> var2) throws TableNotExistException, TableNotPartitionedException, CatalogException;
CatalogPartition getPartition(ObjectPath var1, CatalogPartitionSpec var2) throws PartitionNotExistException, CatalogException;
boolean partitionExists(ObjectPath var1, CatalogPartitionSpec var2) throws CatalogException;
void createPartition(ObjectPath var1, CatalogPartitionSpec var2, CatalogPartition var3, boolean var4) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException;
void dropPartition(ObjectPath var1, CatalogPartitionSpec var2, boolean var3) throws PartitionNotExistException, CatalogException;
void alterPartition(ObjectPath var1, CatalogPartitionSpec var2, CatalogPartition var3, boolean var4) throws PartitionNotExistException, CatalogException;
List<String> listFunctions(String var1) throws DatabaseNotExistException, CatalogException;
CatalogFunction getFunction(ObjectPath var1) throws FunctionNotExistException, CatalogException;
boolean functionExists(ObjectPath var1) throws CatalogException;
void createFunction(ObjectPath var1, CatalogFunction var2, boolean var3) throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException;
void alterFunction(ObjectPath var1, CatalogFunction var2, boolean var3) throws FunctionNotExistException, CatalogException;
void dropFunction(ObjectPath var1, boolean var2) throws FunctionNotExistException, CatalogException;
CatalogTableStatistics getTableStatistics(ObjectPath var1) throws TableNotExistException, CatalogException;
CatalogColumnStatistics getTableColumnStatistics(ObjectPath var1) throws TableNotExistException, CatalogException;
CatalogTableStatistics getPartitionStatistics(ObjectPath var1, CatalogPartitionSpec var2) throws PartitionNotExistException, CatalogException;
CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath var1, CatalogPartitionSpec var2) throws PartitionNotExistException, CatalogException;
void alterTableStatistics(ObjectPath var1, CatalogTableStatistics var2, boolean var3) throws TableNotExistException, CatalogException;
void alterTableColumnStatistics(ObjectPath var1, CatalogColumnStatistics var2, boolean var3) throws TableNotExistException, CatalogException, TablePartitionedException;
void alterPartitionStatistics(ObjectPath var1, CatalogPartitionSpec var2, CatalogTableStatistics var3, boolean var4) throws PartitionNotExistException, CatalogException;
void alterPartitionColumnStatistics(ObjectPath var1, CatalogPartitionSpec var2, CatalogColumnStatistics var3, boolean var4) throws PartitionNotExistException, CatalogException;
}
public Optional<Factory> getFactory() {
return Optional.of(new FlinkTableFactory());
}
评论已关闭