Flink Paimon connector 全解

https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces

Flink连接Paimon,需要实现Flink对应的Source 与Sink接口,以及对应的Catalog接口才可以实现。

Source接口

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sourcessinks/

image-20240212113428844.png

Flink如何想要操作Paimon需要实现DynamicTableSourceFactory

public interface DynamicTableSourceFactory extends DynamicTableFactory {
    DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context var1);
}

此接口目的是为了创建DynamicTableSource,用户可以自定义的Source分为两种,一种是ScanTableSource,这种Source就是正常的Source用来从表中读取数据;另外一种Source是LookupTableSource,这种Source主要是为了做Lookupjoin进行关联的Table,如果用户想要在FLinksql 中进行lookupjoin Paimon表就需要实现这种Source。

ScanTableSource

(Flink接口)

public interface ScanTableSource extends DynamicTableSource {
    ChangelogMode getChangelogMode();

    ScanRuntimeProvider getScanRuntimeProvider(ScanContext var1);

    @PublicEvolving
    public interface ScanRuntimeProvider {
        boolean isBounded();
    }

    @PublicEvolving
    public interface ScanContext extends DynamicTableSource.Context {
    }
}

此接口我们实现时候主要是为了实现一个ScanRuntimeProvider

Paimon实现此接口

public abstract class BaseTableSource implements ScanTableSource {

    private final FlinkTableSource source; //paimon使用BaseTableSource将具体实现FlinkTableSource 包装了起来。

    public BaseTableSource(FlinkTableSource source) {
        this.source = source;
    }

    @Override
    public ChangelogMode getChangelogMode() {
        return source.getChangelogMode();
    }

    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
        return source.getScanRuntimeProvider(runtimeProviderContext);
    }

    @Override
    public String asSummaryString() {
        return source.asSummaryString();
    }
}

FlinkTableSource(Paimon)

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
    LogSourceProvider logSourceProvider = null;
    if (logStoreTableFactory != null) {
        logSourceProvider =
                logStoreTableFactory.createSourceProvider(context, scanContext, projectFields);
    }

    WatermarkStrategy<RowData> watermarkStrategy = this.watermarkStrategy;
    Options options = Options.fromMap(table.options());
    if (watermarkStrategy != null) {
        WatermarkEmitStrategy emitStrategy = options.get(SCAN_WATERMARK_EMIT_STRATEGY);
        if (emitStrategy == WatermarkEmitStrategy.ON_EVENT) {
            watermarkStrategy = new OnEventWatermarkStrategy(watermarkStrategy);
        }
        Duration idleTimeout = options.get(SCAN_WATERMARK_IDLE_TIMEOUT);
        if (idleTimeout != null) {
            watermarkStrategy = watermarkStrategy.withIdleness(idleTimeout);
        }
        String watermarkAlignGroup = options.get(SCAN_WATERMARK_ALIGNMENT_GROUP);
        if (watermarkAlignGroup != null) {
            try {
                watermarkStrategy =
                        WatermarkAlignUtils.withWatermarkAlignment(
                                watermarkStrategy,
                                watermarkAlignGroup,
                                options.get(SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT),
                                options.get(SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL));
            } catch (NoSuchMethodError error) {
                throw new RuntimeException(
                        "Flink 1.14 does not support watermark alignment, please check your Flink version.",
                        error);
            }
        }
    }

    FlinkSourceBuilder sourceBuilder =
            new FlinkSourceBuilder(tableIdentifier, table)
                    .withContinuousMode(streaming)
                    .withLogSourceProvider(logSourceProvider)
                    .withProjection(projectFields)
                    .withPredicate(predicate)
                    .withLimit(limit)
                    .withWatermarkStrategy(watermarkStrategy)
                    .withDynamicPartitionFilteringFields(dynamicPartitionFilteringFields);

    return new PaimonDataStreamScanProvider(
            !streaming, env -> configureSource(sourceBuilder, env));
}

private DataStream<RowData> configureSource(
            FlinkSourceBuilder sourceBuilder, StreamExecutionEnvironment env) {
        Options options = Options.fromMap(this.table.options());
        Configuration envConfig = (Configuration) env.getConfiguration();
        if (envConfig.containsKey(FLINK_INFER_SCAN_PARALLELISM)) {
            options.set(
                    FlinkConnectorOptions.INFER_SCAN_PARALLELISM,
                    Boolean.parseBoolean(envConfig.toMap().get(FLINK_INFER_SCAN_PARALLELISM)));
        }
        Integer parallelism = options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
        if (parallelism == null && options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) {
            if (streaming) {
                parallelism = options.get(CoreOptions.BUCKET);
            } else {
                scanSplitsForInference();
                parallelism = splitStatistics.splitNumber();
                if (null != limit && limit > 0) {
                    int limitCount =
                            limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE : limit.intValue();
                    parallelism = Math.min(parallelism, limitCount);
                }

                parallelism = Math.max(1, parallelism);
            }
            parallelism =
                    Math.min(
                            parallelism,
                            options.get(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM));
        }

        return sourceBuilder.withParallelism(parallelism).withEnv(env).build();
}

从上图可以看到Paimon实现了一个PaimonDataStreamScanProvider的类此类是继承Flink的DataStreamScanProvider接口

public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider {
    default DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
        return this.produceDataStream(execEnv);
    }

    /** @deprecated */
    @Deprecated
    default DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
        throw new UnsupportedOperationException("This method is deprecated. Use produceDataStream(ProviderContext, StreamExecutionEnvironment) instead");
    }
}

实现此接口方法需要传入一个env之后返回一个DataStream就可实现链接到Flink。

Paimon创建了一个FlinkSourceBuilder用来构建这个DataStream。

FlinkSourceBuilder(Paimon)

public DataStream<RowData> build() {
    if (env == null) {
        throw new IllegalArgumentException("StreamExecutionEnvironment should not be null.");
    }

    if (isContinuous) {
        TableScanUtils.streamingReadingValidate(table);

        // TODO visit all options through CoreOptions
        StartupMode startupMode = CoreOptions.startupMode(conf);
        StreamingReadMode streamingReadMode = CoreOptions.streamReadType(conf);

        if (logSourceProvider != null && streamingReadMode != FILE) {
            if (startupMode != StartupMode.LATEST_FULL) {
                return toDataStream(logSourceProvider.createSource(null));
            } else {
                return toDataStream(
                        HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder(
                                        LogHybridSourceFactory.buildHybridFirstSource(
                                                table, projectedFields, predicate))
                                .addSource(
                                        new LogHybridSourceFactory(logSourceProvider),
                                        Boundedness.CONTINUOUS_UNBOUNDED)
                                .build());
            }
        } else {
            if (conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) {
                return buildAlignedContinuousFileSource();
            } else if (conf.contains(CoreOptions.CONSUMER_ID)
                    && conf.get(CoreOptions.CONSUMER_CONSISTENCY_MODE)
                            == CoreOptions.ConsumerMode.EXACTLY_ONCE) {
                return buildContinuousStreamOperator();
            } else {
                return buildContinuousFileSource();
            }
        }
    } else {
        return buildStaticFileSource();
    }
}

可以看到Paimon根据不同的参数配置创建了不同的DataStream,我们在这只看最基础的流读的 Source,buildContinuousFileSource。

private DataStream<RowData> buildContinuousFileSource() {
    return toDataStream(
            new ContinuousFileStoreSource(
                    createReadBuilder(),
                    table.options(),
                    limit,
                    table instanceof FileStoreTable
                            ? ((FileStoreTable) table).bucketMode()
                            : BucketMode.FIXED));
}
private DataStream<RowData> toDataStream(Source<RowData, ?, ?> source) {
        DataStreamSource<RowData> dataStream =
                env.fromSource(
                        source,
                        watermarkStrategy == null
                                ? WatermarkStrategy.noWatermarks()
                                : watermarkStrategy,
                        tableIdentifier.asSummaryString(),
                        produceTypeInfo());
        if (parallelism != null) {
            dataStream.setParallelism(parallelism);
        }
        return dataStream;
    }

通过上方代码我们可以看到Paimon创建了一个ContinuousFileStoreSource

ContinuousFileStoreSource继承自FlinkSource,FlinkSource又继承了Flink的Source接口

public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends SourceReaderFactory<T, SplitT> {
    Boundedness getBoundedness();

    SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> var1) throws Exception;

    SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(SplitEnumeratorContext<SplitT> var1, EnumChkT var2) throws Exception;

    SimpleVersionedSerializer<SplitT> getSplitSerializer();

    SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
}

Source包含两个主要的组件:

  • SplitEnumerator:发现和指派split(split可以为文件,分区等)。
  • Reader:负责从split中读取真实数据。

Paimon实现FileStoreSourceReader

public class FileStoreSourceReader
        extends SingleThreadMultiplexSourceReaderBase<
                RecordIterator<RowData>, RowData, FileStoreSourceSplit, FileStoreSourceSplitState> {

    private final IOManager ioManager;

    private long lastConsumeSnapshotId = Long.MIN_VALUE;

    public FileStoreSourceReader(
            SourceReaderContext readerContext,
            TableRead tableRead,
            FileStoreSourceReaderMetrics metrics,
            IOManager ioManager,
            @Nullable Long limit) {
        // limiter is created in SourceReader, it can be shared in all split readers
        super(
                () ->
                        new FileStoreSourceSplitReader(
                                tableRead, RecordLimiter.create(limit), metrics),
                (element, output, state) ->
                        FlinkRecordsWithSplitIds.emitRecord(element, output, state, metrics),
                readerContext.getConfiguration(),
                readerContext);
        this.ioManager = ioManager;
    }

    public FileStoreSourceReader(
            SourceReaderContext readerContext,
            TableRead tableRead,
            FileStoreSourceReaderMetrics metrics,
            IOManager ioManager,
            @Nullable Long limit,
            FutureCompletingBlockingQueue<RecordsWithSplitIds<RecordIterator<RowData>>>
                    elementsQueue) {
        super(
                elementsQueue,
                () ->
                        new FileStoreSourceSplitReader(
                                tableRead, RecordLimiter.create(limit), metrics),
                (element, output, state) ->
                        FlinkRecordsWithSplitIds.emitRecord(element, output, state, metrics),
                readerContext.getConfiguration(),
                readerContext);
        this.ioManager = ioManager;
    }

    @Override
    public void start() {
        // we request a split only if we did not get splits during the checkpoint restore
        if (getNumberOfCurrentlyAssignedSplits() == 0) {
            context.sendSplitRequest();
        }
    }

    @Override
    protected FileStoreSourceSplitState initializedState(FileStoreSourceSplit split) {
        return new FileStoreSourceSplitState(split);
    }

    @Override
    protected FileStoreSourceSplit toSplitType(
            String splitId, FileStoreSourceSplitState splitState) {
        return splitState.toSourceSplit();
    }

    @Override
    public void close() throws Exception {
        super.close();
        ioManager.close();
    }
}

以及ContinuousFileSplitEnumerator类。

public class ContinuousFileSplitEnumerator
        implements SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> {
    
        }

以上为Flink链接Paimon Scan Source的代码实现

LookupTableSource

lookupTableSource主要是为了做lookupjoin table来实现的接口,在运行期间,LookupTableSource 接口会在外部存储系统中按照 key 进行查找。

LookupTableSource(Flink)

public interface LookupTableSource extends DynamicTableSource {
    LookupRuntimeProvider getLookupRuntimeProvider(LookupContext var1);

    @PublicEvolving
    public interface LookupRuntimeProvider {
    }

    @PublicEvolving
    public interface LookupContext extends DynamicTableSource.Context {
        int[][] getKeys();
    }
}

Paimon的DataTableSource实现了此接口

DataTableSource

public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
    if (limit != null) {
        throw new RuntimeException(
                "Limit push down should not happen in Lookup source, but it is " + limit);
    }
    int[] projection =
            projectFields == null
                    ? IntStream.range(0, table.rowType().getFieldCount()).toArray()
                    : Projection.of(projectFields).toTopLevelIndexes();
    int[] joinKey = Projection.of(context.getKeys()).toTopLevelIndexes();
    Options options = new Options(table.options());
    boolean enableAsync = options.get(LOOKUP_ASYNC);
    int asyncThreadNumber = options.get(LOOKUP_ASYNC_THREAD_NUMBER);
    return LookupRuntimeProviderFactory.create(
            new FileStoreLookupFunction(table, projection, joinKey, predicate),
            enableAsync,
            asyncThreadNumber);
}

Flink 接口

public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {
    static LookupFunctionProvider of(LookupFunction lookupFunction) {
        return () -> {
            return lookupFunction;
        };
    }

    LookupFunction createLookupFunction();
}

在实现LookupFunctionProvider接口内部的一个LookupFunction接口主要逻辑都在lookupFunction里面

LookupFunction(flink)

public abstract class LookupFunction extends TableFunction<RowData> {
    public LookupFunction() {
    }
    //此处为需要实现的主要关联逻辑
    public abstract Collection<RowData> lookup(RowData var1) throws IOException;

    public final void eval(Object... keys) {
        GenericRowData keyRow = GenericRowData.of(keys);

        try {
            Collection<RowData> lookup = this.lookup(keyRow);
            if (lookup != null) {
                lookup.forEach(this::collect);
            }
        } catch (IOException var4) {
            throw new RuntimeException(String.format("Failed to lookup values with given key row '%s'", keyRow), var4);
        }
    }
}

Paimon对LookupFuntion的实现

public class NewLookupFunction extends LookupFunction {

   private static final long serialVersionUID = 1L;

 private final FileStoreLookupFunction function;

    public NewLookupFunction(FileStoreLookupFunction function) {
        this.function = function;
    }
    @Override
    public void open(FunctionContext context) throws Exception {       
        function.open(context);
    }

    @Override
    public Collection<RowData> lookup(RowData keyRow) throws IOException {        
        return function.lookup(keyRow);
    }

    @Override
    public void close() throws Exception {
        function.close();
    }
}

可以看到主要逻辑都在FileStoreLookupFunction里面

public Collection<RowData> lookup(RowData keyRow) {
    try {
        checkRefresh();
        List<InternalRow> results = lookupTable.get(new FlinkRowWrapper(keyRow));
        List<RowData> rows = new ArrayList<>(results.size());
        for (InternalRow matchedRow : results) {
            rows.add(new FlinkRowData(matchedRow));
        }
        return rows;
    } catch (OutOfRangeException e) {
        reopen();
        return lookup(keyRow);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

以上为Source的相关实现

Sink接口

paimon的Sink接口用户需要自己实现DynamicTableSink

public interface DynamicTableSink {
    ChangelogMode getChangelogMode(ChangelogMode var1);

    SinkRuntimeProvider getSinkRuntimeProvider(Context var1);

    DynamicTableSink copy();

    String asSummaryString();

    @PublicEvolving
    public interface SinkRuntimeProvider {
    }

    @PublicEvolving
    public interface DataStructureConverter extends RuntimeConverter {
        @Nullable
        Object toExternal(@Nullable Object var1);
    }

    @PublicEvolving
    public interface Context {
        boolean isBounded();

        <T> TypeInformation<T> createTypeInformation(DataType var1);

        <T> TypeInformation<T> createTypeInformation(LogicalType var1);

        DataStructureConverter createDataStructureConverter(DataType var1);

        Optional<int[][]> getTargetColumns();
    }
}

Paimon实现

public DynamicTableSink createDynamicTableSink(Context context) {
    Table table = buildPaimonTable(context);
    if (table instanceof FileStoreTable) {
        storeTableLineage(
                ((FileStoreTable) table).catalogEnvironment().lineageMetaFactory(),
                context,
                (entity, lineageFactory) -> {
                    try (LineageMeta lineage =
                            lineageFactory.create(() -> Options.fromMap(table.options()))) {
                        lineage.saveSinkTableLineage(entity);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
    }
    return new FlinkTableSink(
            context.getObjectIdentifier(),
            table,
            context,
            createOptionalLogStoreFactory(context).orElse(null));
}
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
    if (overwrite && !context.isBounded()) {
        throw new UnsupportedOperationException(
                "Paimon doesn't support streaming INSERT OVERWRITE.");
    }

    LogSinkProvider logSinkProvider = null;
    if (logStoreTableFactory != null) {
        logSinkProvider = logStoreTableFactory.createSinkProvider(this.context, context);
    }

    Options conf = Options.fromMap(table.options());
    // Do not sink to log store when overwrite mode
    final LogSinkFunction logSinkFunction =
            overwrite ? null : (logSinkProvider == null ? null : logSinkProvider.createSink());
    return new PaimonDataStreamSinkProvider(
            (dataStream) ->
                    new FlinkSinkBuilder((FileStoreTable) table)
                            .withInput(
                                    new DataStream<>(
                                            dataStream.getExecutionEnvironment(),
                                            dataStream.getTransformation()))
                            .withLogSinkFunction(logSinkFunction)
                            .withOverwritePartition(overwrite ? staticPartitions : null)
                            .withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
                            .withBoundedInputStream(context.isBounded())
                            .build());
}

跟Source类似,paimon需要实现一个SinkRuntimeProvider,主要逻辑都在SinkRuntimeProvider里面

SinkRuntimeProvider Flink接口

public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
    default DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
        return this.consumeDataStream(dataStream);
    }

    /** @deprecated */
    @Deprecated
    default DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
        throw new UnsupportedOperationException("This method is deprecated. Use consumeDataStream(ProviderContext, DataStream<RowData>) instead");
    }

    default Optional<Integer> getParallelism() {
        return Optional.empty();
    }
}

可以看到实现此接口需要传入一个DataStream,之后返回一个DataStreamSink就可以,也就是说Paimon可以根据传入的DataStream接着在后面进一步做操作。

public class PaimonDataStreamSinkProvider implements DataStreamSinkProvider {

    private final Function<DataStream<RowData>, DataStreamSink<?>> producer;

    public PaimonDataStreamSinkProvider(Function<DataStream<RowData>, DataStreamSink<?>> producer) {
        this.producer = producer;
    }

    @Override
    public DataStreamSink<?> consumeDataStream(
            ProviderContext providerContext, DataStream<RowData> dataStream) {
        return producer.apply(dataStream);
    }
}

主要逻辑都在producer里面

public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
    if (overwrite && !context.isBounded()) {
        throw new UnsupportedOperationException(
                "Paimon doesn't support streaming INSERT OVERWRITE.");
    }

    LogSinkProvider logSinkProvider = null;
    if (logStoreTableFactory != null) {
        logSinkProvider = logStoreTableFactory.createSinkProvider(this.context, context);
    }

    Options conf = Options.fromMap(table.options());
    // Do not sink to log store when overwrite mode
    final LogSinkFunction logSinkFunction =
            overwrite ? null : (logSinkProvider == null ? null : logSinkProvider.createSink());
    return new PaimonDataStreamSinkProvider(
            (dataStream) ->
                    new FlinkSinkBuilder((FileStoreTable) table)
                            .withInput(
                                    new DataStream<>(
                                            dataStream.getExecutionEnvironment(),
                                            dataStream.getTransformation()))
                            .withLogSinkFunction(logSinkFunction)
                            .withOverwritePartition(overwrite ? staticPartitions : null)
                            .withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
                            .withBoundedInputStream(context.isBounded())
                            .build());
}

producer的主要逻辑都在FlinkSinkBuilder.build()里面

public DataStreamSink<?> build() {
    DataStream<InternalRow> input = MapToInternalRow.map(this.input, table.rowType());
    if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) {
        input =
                input.forward()
                        .transform(
                                "local merge",
                                input.getType(),
                                new LocalMergeOperator(table.schema()))
                        .setParallelism(input.getParallelism());
    }

    BucketMode bucketMode = table.bucketMode();
    switch (bucketMode) {
        case FIXED:
            return buildForFixedBucket(input);
        case DYNAMIC:
            return buildDynamicBucketSink(input, false);
        case GLOBAL_DYNAMIC:
            return buildDynamicBucketSink(input, true);
        case UNAWARE:
            return buildUnawareBucketSink(input);
        default:
            throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode);
    }
}

这里根部不同类型有不同的实现,我们这次看一下buildForFixedBucket这个方法。

private DataStreamSink<?> buildForFixedBucket(DataStream<InternalRow> input) {
    DataStream<InternalRow> partitioned =
            partition(
                    input,
                    new RowDataChannelComputer(table.schema(), logSinkFunction != null),
                    parallelism);
    FixedBucketSink sink = new FixedBucketSink(table, overwritePartition, logSinkFunction);
    return sink.sinkFrom(partitioned);
}

接着进入FixedBucketSink类里面

public DataStreamSink<?> sinkFrom(DataStream<T> input, String initialCommitUser) {
    assertNoSinkMaterializer(input);

    // do the actually writing action, no snapshot generated in this stage
    DataStream<Committable> written = doWrite(input, initialCommitUser, input.getParallelism());

    // commit the committable to generate a new snapshot
    return doCommit(written, initialCommitUser);
}

可以看到paimon根据传入的input Stream,首先又接了一个Writer Datastream,之后又接了一个Commit dataStream。

public DataStream<Committable> doWrite(
        DataStream<T> input, String commitUser, @Nullable Integer parallelism) {
    StreamExecutionEnvironment env = input.getExecutionEnvironment();
    boolean isStreaming =
            StreamExecutionEnvironmentUtils.getConfiguration(env)
                            .get(ExecutionOptions.RUNTIME_MODE)
                    == RuntimeExecutionMode.STREAMING;

    boolean writeOnly = table.coreOptions().writeOnly();
    SingleOutputStreamOperator<Committable> written =
            input.transform(
                            (writeOnly ? WRITER_WRITE_ONLY_NAME : WRITER_NAME)
                                    + " : "
                                    + table.name(),
                            new CommittableTypeInfo(),
                            createWriteOperator(
                                    createWriteProvider(env.getCheckpointConfig(), isStreaming),
                                    commitUser))
                    .setParallelism(parallelism == null ? input.getParallelism() : parallelism);

    if (!isStreaming) {
        assertBatchConfiguration(env, written.getParallelism());
    }

    Options options = Options.fromMap(table.options());
    if (options.get(SINK_USE_MANAGED_MEMORY)) {
        declareManagedMemory(written, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
    }
    return written;
}

doWrite方法里面自定义了一个Operator用于向Paimon里面写bucket数据,以及做数据压缩等操作。

protected DataStreamSink<?> doCommit(DataStream<Committable> written, String commitUser) {
    StreamExecutionEnvironment env = written.getExecutionEnvironment();
    ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    boolean isStreaming =
            conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
    boolean streamingCheckpointEnabled =
            isStreaming && checkpointConfig.isCheckpointingEnabled();
    if (streamingCheckpointEnabled) {
        assertStreamingConfiguration(env);
    }

    OneInputStreamOperator<Committable, Committable> committerOperator =
            new CommitterOperator<>(
                    streamingCheckpointEnabled,
                    commitUser,
                    createCommitterFactory(streamingCheckpointEnabled),
                    createCommittableStateManager());
    if (Options.fromMap(table.options()).get(SINK_AUTO_TAG_FOR_SAVEPOINT)) {
        committerOperator =
                new AutoTagForSavepointCommitterOperator<>(
                        (CommitterOperator<Committable, ManifestCommittable>) committerOperator,
                        table::snapshotManager,
                        table::tagManager,
                        () -> table.store().newTagDeletion(),
                        () -> table.store().createTagCallbacks());
    }
    if (conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.BATCH
            && table.coreOptions().tagCreationMode() == TagCreationMode.BATCH) {
        committerOperator =
                new BatchWriteGeneratorTagOperator<>(
                        (CommitterOperator<Committable, ManifestCommittable>) committerOperator,
                        table);
    }
    SingleOutputStreamOperator<?> committed =
            written.transform(
                            GLOBAL_COMMITTER_NAME + " : " + table.name(),
                            new CommittableTypeInfo(),
                            committerOperator)
                    .setParallelism(1)
                    .setMaxParallelism(1);
    Options options = Options.fromMap(table.options());
    configureGlobalCommitter(
            committed,
            options.get(SINK_COMMITTER_CPU),
            options.get(SINK_COMMITTER_MEMORY),
            conf);
    return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}

Commit也是自定义实现了一个Operator用来提交元数据信息。最后返回了一个空的DiscardingSink做结束。

Catalog

Flink想要操作paimon的元数据需要自己实现catalog的接口

public interface Catalog {
    //此接口主要是用来将catalog与table connect链接起来需要返回table的工厂类
    default Optional<Factory> getFactory() {
        return Optional.empty();
    }

    /** @deprecated */
    @Deprecated
    default Optional<TableFactory> getTableFactory() {
        return Optional.empty();
    }

    default Optional<FunctionDefinitionFactory> getFunctionDefinitionFactory() {
        return Optional.empty();
    }

    void open() throws CatalogException;

    void close() throws CatalogException;

    String getDefaultDatabase() throws CatalogException;

    List<String> listDatabases() throws CatalogException;

    CatalogDatabase getDatabase(String var1) throws DatabaseNotExistException, CatalogException;

    boolean databaseExists(String var1) throws CatalogException;

    void createDatabase(String var1, CatalogDatabase var2, boolean var3) throws DatabaseAlreadyExistException, CatalogException;

    default void dropDatabase(String name, boolean ignoreIfNotExists) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
        this.dropDatabase(name, ignoreIfNotExists, false);
    }

    void dropDatabase(String var1, boolean var2, boolean var3) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException;

    void alterDatabase(String var1, CatalogDatabase var2, boolean var3) throws DatabaseNotExistException, CatalogException;

    List<String> listTables(String var1) throws DatabaseNotExistException, CatalogException;

    List<String> listViews(String var1) throws DatabaseNotExistException, CatalogException;

    CatalogBaseTable getTable(ObjectPath var1) throws TableNotExistException, CatalogException;

    boolean tableExists(ObjectPath var1) throws CatalogException;

    void dropTable(ObjectPath var1, boolean var2) throws TableNotExistException, CatalogException;

    void renameTable(ObjectPath var1, String var2, boolean var3) throws TableNotExistException, TableAlreadyExistException, CatalogException;

    void createTable(ObjectPath var1, CatalogBaseTable var2, boolean var3) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException;

    void alterTable(ObjectPath var1, CatalogBaseTable var2, boolean var3) throws TableNotExistException, CatalogException;

    List<CatalogPartitionSpec> listPartitions(ObjectPath var1) throws TableNotExistException, TableNotPartitionedException, CatalogException;

    List<CatalogPartitionSpec> listPartitions(ObjectPath var1, CatalogPartitionSpec var2) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException;

    List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath var1, List<Expression> var2) throws TableNotExistException, TableNotPartitionedException, CatalogException;

    CatalogPartition getPartition(ObjectPath var1, CatalogPartitionSpec var2) throws PartitionNotExistException, CatalogException;

    boolean partitionExists(ObjectPath var1, CatalogPartitionSpec var2) throws CatalogException;

    void createPartition(ObjectPath var1, CatalogPartitionSpec var2, CatalogPartition var3, boolean var4) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException;

    void dropPartition(ObjectPath var1, CatalogPartitionSpec var2, boolean var3) throws PartitionNotExistException, CatalogException;

    void alterPartition(ObjectPath var1, CatalogPartitionSpec var2, CatalogPartition var3, boolean var4) throws PartitionNotExistException, CatalogException;

    List<String> listFunctions(String var1) throws DatabaseNotExistException, CatalogException;

    CatalogFunction getFunction(ObjectPath var1) throws FunctionNotExistException, CatalogException;

    boolean functionExists(ObjectPath var1) throws CatalogException;

    void createFunction(ObjectPath var1, CatalogFunction var2, boolean var3) throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException;

    void alterFunction(ObjectPath var1, CatalogFunction var2, boolean var3) throws FunctionNotExistException, CatalogException;

    void dropFunction(ObjectPath var1, boolean var2) throws FunctionNotExistException, CatalogException;

    CatalogTableStatistics getTableStatistics(ObjectPath var1) throws TableNotExistException, CatalogException;

    CatalogColumnStatistics getTableColumnStatistics(ObjectPath var1) throws TableNotExistException, CatalogException;

    CatalogTableStatistics getPartitionStatistics(ObjectPath var1, CatalogPartitionSpec var2) throws PartitionNotExistException, CatalogException;

    CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath var1, CatalogPartitionSpec var2) throws PartitionNotExistException, CatalogException;

    void alterTableStatistics(ObjectPath var1, CatalogTableStatistics var2, boolean var3) throws TableNotExistException, CatalogException;

    void alterTableColumnStatistics(ObjectPath var1, CatalogColumnStatistics var2, boolean var3) throws TableNotExistException, CatalogException, TablePartitionedException;

    void alterPartitionStatistics(ObjectPath var1, CatalogPartitionSpec var2, CatalogTableStatistics var3, boolean var4) throws PartitionNotExistException, CatalogException;

    void alterPartitionColumnStatistics(ObjectPath var1, CatalogPartitionSpec var2, CatalogColumnStatistics var3, boolean var4) throws PartitionNotExistException, CatalogException;
}
public Optional<Factory> getFactory() {
    return Optional.of(new FlinkTableFactory());
}

标签: none

评论已关闭