Flink Paimon connector 全解

https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces

Flink连接Paimon,需要实现Flink对应的Source 与Sink接口,以及对应的Catalog接口才可以实现。

Source接口

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sourcessinks/

image-20240212113428844.png

Flink如何想要操作Paimon需要实现DynamicTableSourceFactory

public interface DynamicTableSourceFactory extends DynamicTableFactory {
    DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context var1);
}

此接口目的是为了创建DynamicTableSource,用户可以自定义的Source分为两种,一种是ScanTableSource,这种Source就是正常的Source用来从表中读取数据;另外一种Source是LookupTableSource,这种Source主要是为了做Lookupjoin进行关联的Table,如果用户想要在FLinksql 中进行lookupjoin Paimon表就需要实现这种Source。

ScanTableSource

(Flink接口)

public interface ScanTableSource extends DynamicTableSource {
    ChangelogMode getChangelogMode();

    ScanRuntimeProvider getScanRuntimeProvider(ScanContext var1);

    @PublicEvolving
    public interface ScanRuntimeProvider {
        boolean isBounded();
    }

    @PublicEvolving
    public interface ScanContext extends DynamicTableSource.Context {
    }
}

此接口我们实现时候主要是为了实现一个ScanRuntimeProvider

Paimon实现此接口

public abstract class BaseTableSource implements ScanTableSource {

    private final FlinkTableSource source; //paimon使用BaseTableSource将具体实现FlinkTableSource 包装了起来。

    public BaseTableSource(FlinkTableSource source) {
        this.source = source;
    }

    @Override
    public ChangelogMode getChangelogMode() {
        return source.getChangelogMode();
    }

    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
        return source.getScanRuntimeProvider(runtimeProviderContext);
    }

    @Override
    public String asSummaryString() {
        return source.asSummaryString();
    }
}

FlinkTableSource(Paimon)

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
    LogSourceProvider logSourceProvider = null;
    if (logStoreTableFactory != null) {
        logSourceProvider =
                logStoreTableFactory.createSourceProvider(context, scanContext, projectFields);
    }

    WatermarkStrategy<RowData> watermarkStrategy = this.watermarkStrategy;
    Options options = Options.fromMap(table.options());
    if (watermarkStrategy != null) {
        WatermarkEmitStrategy emitStrategy = options.get(SCAN_WATERMARK_EMIT_STRATEGY);
        if (emitStrategy == WatermarkEmitStrategy.ON_EVENT) {
            watermarkStrategy = new OnEventWatermarkStrategy(watermarkStrategy);
        }
        Duration idleTimeout = options.get(SCAN_WATERMARK_IDLE_TIMEOUT);
        if (idleTimeout != null) {
            watermarkStrategy = watermarkStrategy.withIdleness(idleTimeout);
        }
        String watermarkAlignGroup = options.get(SCAN_WATERMARK_ALIGNMENT_GROUP);
        if (watermarkAlignGroup != null) {
            try {
                watermarkStrategy =
                        WatermarkAlignUtils.withWatermarkAlignment(
                                watermarkStrategy,
                                watermarkAlignGroup,
                                options.get(SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT),
                                options.get(SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL));
            } catch (NoSuchMethodError error) {
                throw new RuntimeException(
                        "Flink 1.14 does not support watermark alignment, please check your Flink version.",
                        error);
            }
        }
    }

    FlinkSourceBuilder sourceBuilder =
            new FlinkSourceBuilder(tableIdentifier, table)
                    .withContinuousMode(streaming)
                    .withLogSourceProvider(logSourceProvider)
                    .withProjection(projectFields)
                    .withPredicate(predicate)
                    .withLimit(limit)
                    .withWatermarkStrategy(watermarkStrategy)
                    .withDynamicPartitionFilteringFields(dynamicPartitionFilteringFields);

    return new PaimonDataStreamScanProvider(
            !streaming, env -> configureSource(sourceBuilder, env));
}

private DataStream<RowData> configureSource(
            FlinkSourceBuilder sourceBuilder, StreamExecutionEnvironment env) {
        Options options = Options.fromMap(this.table.options());
        Configuration envConfig = (Configuration) env.getConfiguration();
        if (envConfig.containsKey(FLINK_INFER_SCAN_PARALLELISM)) {
            options.set(
                    FlinkConnectorOptions.INFER_SCAN_PARALLELISM,
                    Boolean.parseBoolean(envConfig.toMap().get(FLINK_INFER_SCAN_PARALLELISM)));
        }
        Integer parallelism = options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
        if (parallelism == null && options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) {
            if (streaming) {
                parallelism = options.get(CoreOptions.BUCKET);
            } else {
                scanSplitsForInference();
                parallelism = splitStatistics.splitNumber();
                if (null != limit && limit > 0) {
                    int limitCount =
                            limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE : limit.intValue();
                    parallelism = Math.min(parallelism, limitCount);
                }

                parallelism = Math.max(1, parallelism);
            }
            parallelism =
                    Math.min(
                            parallelism,
                            options.get(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM));
        }

        return sourceBuilder.withParallelism(parallelism).withEnv(env).build();
}

从上图可以看到Paimon实现了一个PaimonDataStreamScanProvider的类此类是继承Flink的DataStreamScanProvider接口

public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider {
    default DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
        return this.produceDataStream(execEnv);
    }

    /** @deprecated */
    @Deprecated
    default DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
        throw new UnsupportedOperationException("This method is deprecated. Use produceDataStream(ProviderContext, StreamExecutionEnvironment) instead");
    }
}

实现此接口方法需要传入一个env之后返回一个DataStream就可实现链接到Flink。

Paimon创建了一个FlinkSourceBuilder用来构建这个DataStream。

FlinkSourceBuilder(Paimon)

public DataStream<RowData> build() {
    if (env == null) {
        throw new IllegalArgumentException("StreamExecutionEnvironment should not be null.");
    }

    if (isContinuous) {
        TableScanUtils.streamingReadingValidate(table);

        // TODO visit all options through CoreOptions
        StartupMode startupMode = CoreOptions.startupMode(conf);
        StreamingReadMode streamingReadMode = CoreOptions.streamReadType(conf);

        if (logSourceProvider != null && streamingReadMode != FILE) {
            if (startupMode != StartupMode.LATEST_FULL) {
                return toDataStream(logSourceProvider.createSource(null));
            } else {
                return toDataStream(
                        HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder(
                                        LogHybridSourceFactory.buildHybridFirstSource(
                                                table, projectedFields, predicate))
                                .addSource(
                                        new LogHybridSourceFactory(logSourceProvider),
                                        Boundedness.CONTINUOUS_UNBOUNDED)
                                .build());
            }
        } else {
            if (conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) {
                return buildAlignedContinuousFileSource();
            } else if (conf.contains(CoreOptions.CONSUMER_ID)
                    && conf.get(CoreOptions.CONSUMER_CONSISTENCY_MODE)
                            == CoreOptions.ConsumerMode.EXACTLY_ONCE) {
                return buildContinuousStreamOperator();
            } else {
                return buildContinuousFileSource();
            }
        }
    } else {
        return buildStaticFileSource();
    }
}

可以看到Paimon根据不同的参数配置创建了不同的DataStream,我们在这只看最基础的流读的 Source,buildContinuousFileSource。

private DataStream<RowData> buildContinuousFileSource() {
    return toDataStream(
            new ContinuousFileStoreSource(
                    createReadBuilder(),
                    table.options(),
                    limit,
                    table instanceof FileStoreTable
                            ? ((FileStoreTable) table).bucketMode()
                            : BucketMode.FIXED));
}
private DataStream<RowData> toDataStream(Source<RowData, ?, ?> source) {
        DataStreamSource<RowData> dataStream =
                env.fromSource(
                        source,
                        watermarkStrategy == null
                                ? WatermarkStrategy.noWatermarks()
                                : watermarkStrategy,
                        tableIdentifier.asSummaryString(),
                        produceTypeInfo());
        if (parallelism != null) {
            dataStream.setParallelism(parallelism);
        }
        return dataStream;
    }

通过上方代码我们可以看到Paimon创建了一个ContinuousFileStoreSource

ContinuousFileStoreSource继承自FlinkSource,FlinkSource又继承了Flink的Source接口

public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends SourceReaderFactory<T, SplitT> {
    Boundedness getBoundedness();

    SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> var1) throws Exception;

    SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(SplitEnumeratorContext<SplitT> var1, EnumChkT var2) throws Exception;

    SimpleVersionedSerializer<SplitT> getSplitSerializer();

    SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
}

Source包含两个主要的组件:

  • SplitEnumerator:发现和指派split(split可以为文件,分区等)。
  • Reader:负责从split中读取真实数据。

Paimon实现FileStoreSourceReader

public class FileStoreSourceReader
        extends SingleThreadMultiplexSourceReaderBase<
                RecordIterator<RowData>, RowData, FileStoreSourceSplit, FileStoreSourceSplitState> {

    private final IOManager ioManager;

    private long lastConsumeSnapshotId = Long.MIN_VALUE;

    public FileStoreSourceReader(
            SourceReaderContext readerContext,
            TableRead tableRead,
            FileStoreSourceReaderMetrics metrics,
            IOManager ioManager,
            @Nullable Long limit) {
        // limiter is created in SourceReader, it can be shared in all split readers
        super(
                () ->
                        new FileStoreSourceSplitReader(
                                tableRead, RecordLimiter.create(limit), metrics),
                (element, output, state) ->
                        FlinkRecordsWithSplitIds.emitRecord(element, output, state, metrics),
                readerContext.getConfiguration(),
                readerContext);
        this.ioManager = ioManager;
    }

    public FileStoreSourceReader(
            SourceReaderContext readerContext,
            TableRead tableRead,
            FileStoreSourceReaderMetrics metrics,
            IOManager ioManager,
            @Nullable Long limit,
            FutureCompletingBlockingQueue<RecordsWithSplitIds<RecordIterator<RowData>>>
                    elementsQueue) {
        super(
                elementsQueue,
                () ->
                        new FileStoreSourceSplitReader(
                                tableRead, RecordLimiter.create(limit), metrics),
                (element, output, state) ->
                        FlinkRecordsWithSplitIds.emitRecord(element, output, state, metrics),
                readerContext.getConfiguration(),
                readerContext);
        this.ioManager = ioManager;
    }

    @Override
    public void start() {
        // we request a split only if we did not get splits during the checkpoint restore
        if (getNumberOfCurrentlyAssignedSplits() == 0) {
            context.sendSplitRequest();
        }
    }

    @Override
    protected FileStoreSourceSplitState initializedState(FileStoreSourceSplit split) {
        return new FileStoreSourceSplitState(split);
    }

    @Override
    protected FileStoreSourceSplit toSplitType(
            String splitId, FileStoreSourceSplitState splitState) {
        return splitState.toSourceSplit();
    }

    @Override
    public void close() throws Exception {
        super.close();
        ioManager.close();
    }
}

以及ContinuousFileSplitEnumerator类。

public class ContinuousFileSplitEnumerator
        implements SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> {
    
        }

以上为Flink链接Paimon Scan Source的代码实现

LookupTableSource

lookupTableSource主要是为了做lookupjoin table来实现的接口,在运行期间,LookupTableSource 接口会在外部存储系统中按照 key 进行查找。

LookupTableSource(Flink)

public interface LookupTableSource extends DynamicTableSource {
    LookupRuntimeProvider getLookupRuntimeProvider(LookupContext var1);

    @PublicEvolving
    public interface LookupRuntimeProvider {
    }

    @PublicEvolving
    public interface LookupContext extends DynamicTableSource.Context {
        int[][] getKeys();
    }
}

Paimon的DataTableSource实现了此接口

DataTableSource

public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
    if (limit != null) {
        throw new RuntimeException(
                "Limit push down should not happen in Lookup source, but it is " + limit);
    }
    int[] projection =
            projectFields == null
                    ? IntStream.range(0, table.rowType().getFieldCount()).toArray()
                    : Projection.of(projectFields).toTopLevelIndexes();
    int[] joinKey = Projection.of(context.getKeys()).toTopLevelIndexes();
    Options options = new Options(table.options());
    boolean enableAsync = options.get(LOOKUP_ASYNC);
    int asyncThreadNumber = options.get(LOOKUP_ASYNC_THREAD_NUMBER);
    return LookupRuntimeProviderFactory.create(
            new FileStoreLookupFunction(table, projection, joinKey, predicate),
            enableAsync,
            asyncThreadNumber);
}

Flink 接口

public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {
    static LookupFunctionProvider of(LookupFunction lookupFunction) {
        return () -> {
            return lookupFunction;
        };
    }

    LookupFunction createLookupFunction();
}

在实现LookupFunctionProvider接口内部的一个LookupFunction接口主要逻辑都在lookupFunction里面

LookupFunction(flink)

public abstract class LookupFunction extends TableFunction<RowData> {
    public LookupFunction() {
    }
    //此处为需要实现的主要关联逻辑
    public abstract Collection<RowData> lookup(RowData var1) throws IOException;

    public final void eval(Object... keys) {
        GenericRowData keyRow = GenericRowData.of(keys);

        try {
            Collection<RowData> lookup = this.lookup(keyRow);
            if (lookup != null) {
                lookup.forEach(this::collect);
            }
        } catch (IOException var4) {
            throw new RuntimeException(String.format("Failed to lookup values with given key row '%s'", keyRow), var4);
        }
    }
}

Paimon对LookupFuntion的实现

public class NewLookupFunction extends LookupFunction {

   private static final long serialVersionUID = 1L;

 private final FileStoreLookupFunction function;

    public NewLookupFunction(FileStoreLookupFunction function) {
        this.function = function;
    }
    @Override
    public void open(FunctionContext context) throws Exception {       
        function.open(context);
    }

    @Override
    public Collection<RowData> lookup(RowData keyRow) throws IOException {        
        return function.lookup(keyRow);
    }

    @Override
    public void close() throws Exception {
        function.close();
    }
}

可以看到主要逻辑都在FileStoreLookupFunction里面

public Collection<RowData> lookup(RowData keyRow) {
    try {
        checkRefresh();
        List<InternalRow> results = lookupTable.get(new FlinkRowWrapper(keyRow));
        List<RowData> rows = new ArrayList<>(results.size());
        for (InternalRow matchedRow : results) {
            rows.add(new FlinkRowData(matchedRow));
        }
        return rows;
    } catch (OutOfRangeException e) {
        reopen();
        return lookup(keyRow);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

以上为Source的相关实现

Sink接口

paimon的Sink接口用户需要自己实现DynamicTableSink

public interface DynamicTableSink {
    ChangelogMode getChangelogMode(ChangelogMode var1);

    SinkRuntimeProvider getSinkRuntimeProvider(Context var1);

    DynamicTableSink copy();

    String asSummaryString();

    @PublicEvolving
    public interface SinkRuntimeProvider {
    }

    @PublicEvolving
    public interface DataStructureConverter extends RuntimeConverter {
        @Nullable
        Object toExternal(@Nullable Object var1);
    }

    @PublicEvolving
    public interface Context {
        boolean isBounded();

        <T> TypeInformation<T> createTypeInformation(DataType var1);

        <T> TypeInformation<T> createTypeInformation(LogicalType var1);

        DataStructureConverter createDataStructureConverter(DataType var1);

        Optional<int[][]> getTargetColumns();
    }
}

Paimon实现

public DynamicTableSink createDynamicTableSink(Context context) {
    Table table = buildPaimonTable(context);
    if (table instanceof FileStoreTable) {
        storeTableLineage(
                ((FileStoreTable) table).catalogEnvironment().lineageMetaFactory(),
                context,
                (entity, lineageFactory) -> {
                    try (LineageMeta lineage =
                            lineageFactory.create(() -> Options.fromMap(table.options()))) {
                        lineage.saveSinkTableLineage(entity);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
    }
    return new FlinkTableSink(
            context.getObjectIdentifier(),
            table,
            context,
            createOptionalLogStoreFactory(context).orElse(null));
}
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
    if (overwrite && !context.isBounded()) {
        throw new UnsupportedOperationException(
                "Paimon doesn't support streaming INSERT OVERWRITE.");
    }

    LogSinkProvider logSinkProvider = null;
    if (logStoreTableFactory != null) {
        logSinkProvider = logStoreTableFactory.createSinkProvider(this.context, context);
    }

    Options conf = Options.fromMap(table.options());
    // Do not sink to log store when overwrite mode
    final LogSinkFunction logSinkFunction =
            overwrite ? null : (logSinkProvider == null ? null : logSinkProvider.createSink());
    return new PaimonDataStreamSinkProvider(
            (dataStream) ->
                    new FlinkSinkBuilder((FileStoreTable) table)
                            .withInput(
                                    new DataStream<>(
                                            dataStream.getExecutionEnvironment(),
                                            dataStream.getTransformation()))
                            .withLogSinkFunction(logSinkFunction)
                            .withOverwritePartition(overwrite ? staticPartitions : null)
                            .withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
                            .withBoundedInputStream(context.isBounded())
                            .build());
}

跟Source类似,paimon需要实现一个SinkRuntimeProvider,主要逻辑都在SinkRuntimeProvider里面

SinkRuntimeProvider Flink接口

public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
    default DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
        return this.consumeDataStream(dataStream);
    }

    /** @deprecated */
    @Deprecated
    default DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
        throw new UnsupportedOperationException("This method is deprecated. Use consumeDataStream(ProviderContext, DataStream<RowData>) instead");
    }

    default Optional<Integer> getParallelism() {
        return Optional.empty();
    }
}

可以看到实现此接口需要传入一个DataStream,之后返回一个DataStreamSink就可以,也就是说Paimon可以根据传入的DataStream接着在后面进一步做操作。

public class PaimonDataStreamSinkProvider implements DataStreamSinkProvider {

    private final Function<DataStream<RowData>, DataStreamSink<?>> producer;

    public PaimonDataStreamSinkProvider(Function<DataStream<RowData>, DataStreamSink<?>> producer) {
        this.producer = producer;
    }

    @Override
    public DataStreamSink<?> consumeDataStream(
            ProviderContext providerContext, DataStream<RowData> dataStream) {
        return producer.apply(dataStream);
    }
}

主要逻辑都在producer里面

public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
    if (overwrite && !context.isBounded()) {
        throw new UnsupportedOperationException(
                "Paimon doesn't support streaming INSERT OVERWRITE.");
    }

    LogSinkProvider logSinkProvider = null;
    if (logStoreTableFactory != null) {
        logSinkProvider = logStoreTableFactory.createSinkProvider(this.context, context);
    }

    Options conf = Options.fromMap(table.options());
    // Do not sink to log store when overwrite mode
    final LogSinkFunction logSinkFunction =
            overwrite ? null : (logSinkProvider == null ? null : logSinkProvider.createSink());
    return new PaimonDataStreamSinkProvider(
            (dataStream) ->
                    new FlinkSinkBuilder((FileStoreTable) table)
                            .withInput(
                                    new DataStream<>(
                                            dataStream.getExecutionEnvironment(),
                                            dataStream.getTransformation()))
                            .withLogSinkFunction(logSinkFunction)
                            .withOverwritePartition(overwrite ? staticPartitions : null)
                            .withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
                            .withBoundedInputStream(context.isBounded())
                            .build());
}

producer的主要逻辑都在FlinkSinkBuilder.build()里面

public DataStreamSink<?> build() {
    DataStream<InternalRow> input = MapToInternalRow.map(this.input, table.rowType());
    if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) {
        input =
                input.forward()
                        .transform(
                                "local merge",
                                input.getType(),
                                new LocalMergeOperator(table.schema()))
                        .setParallelism(input.getParallelism());
    }

    BucketMode bucketMode = table.bucketMode();
    switch (bucketMode) {
        case FIXED:
            return buildForFixedBucket(input);
        case DYNAMIC:
            return buildDynamicBucketSink(input, false);
        case GLOBAL_DYNAMIC:
            return buildDynamicBucketSink(input, true);
        case UNAWARE:
            return buildUnawareBucketSink(input);
        default:
            throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode);
    }
}

这里根部不同类型有不同的实现,我们这次看一下buildForFixedBucket这个方法。

private DataStreamSink<?> buildForFixedBucket(DataStream<InternalRow> input) {
    DataStream<InternalRow> partitioned =
            partition(
                    input,
                    new RowDataChannelComputer(table.schema(), logSinkFunction != null),
                    parallelism);
    FixedBucketSink sink = new FixedBucketSink(table, overwritePartition, logSinkFunction);
    return sink.sinkFrom(partitioned);
}

接着进入FixedBucketSink类里面

public DataStreamSink<?> sinkFrom(DataStream<T> input, String initialCommitUser) {
    assertNoSinkMaterializer(input);

    // do the actually writing action, no snapshot generated in this stage
    DataStream<Committable> written = doWrite(input, initialCommitUser, input.getParallelism());

    // commit the committable to generate a new snapshot
    return doCommit(written, initialCommitUser);
}

可以看到paimon根据传入的input Stream,首先又接了一个Writer Datastream,之后又接了一个Commit dataStream。

public DataStream<Committable> doWrite(
        DataStream<T> input, String commitUser, @Nullable Integer parallelism) {
    StreamExecutionEnvironment env = input.getExecutionEnvironment();
    boolean isStreaming =
            StreamExecutionEnvironmentUtils.getConfiguration(env)
                            .get(ExecutionOptions.RUNTIME_MODE)
                    == RuntimeExecutionMode.STREAMING;

    boolean writeOnly = table.coreOptions().writeOnly();
    SingleOutputStreamOperator<Committable> written =
            input.transform(
                            (writeOnly ? WRITER_WRITE_ONLY_NAME : WRITER_NAME)
                                    + " : "
                                    + table.name(),
                            new CommittableTypeInfo(),
                            createWriteOperator(
                                    createWriteProvider(env.getCheckpointConfig(), isStreaming),
                                    commitUser))
                    .setParallelism(parallelism == null ? input.getParallelism() : parallelism);

    if (!isStreaming) {
        assertBatchConfiguration(env, written.getParallelism());
    }

    Options options = Options.fromMap(table.options());
    if (options.get(SINK_USE_MANAGED_MEMORY)) {
        declareManagedMemory(written, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
    }
    return written;
}

doWrite方法里面自定义了一个Operator用于向Paimon里面写bucket数据,以及做数据压缩等操作。

protected DataStreamSink<?> doCommit(DataStream<Committable> written, String commitUser) {
    StreamExecutionEnvironment env = written.getExecutionEnvironment();
    ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    boolean isStreaming =
            conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
    boolean streamingCheckpointEnabled =
            isStreaming && checkpointConfig.isCheckpointingEnabled();
    if (streamingCheckpointEnabled) {
        assertStreamingConfiguration(env);
    }

    OneInputStreamOperator<Committable, Committable> committerOperator =
            new CommitterOperator<>(
                    streamingCheckpointEnabled,
                    commitUser,
                    createCommitterFactory(streamingCheckpointEnabled),
                    createCommittableStateManager());
    if (Options.fromMap(table.options()).get(SINK_AUTO_TAG_FOR_SAVEPOINT)) {
        committerOperator =
                new AutoTagForSavepointCommitterOperator<>(
                        (CommitterOperator<Committable, ManifestCommittable>) committerOperator,
                        table::snapshotManager,
                        table::tagManager,
                        () -> table.store().newTagDeletion(),
                        () -> table.store().createTagCallbacks());
    }
    if (conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.BATCH
            && table.coreOptions().tagCreationMode() == TagCreationMode.BATCH) {
        committerOperator =
                new BatchWriteGeneratorTagOperator<>(
                        (CommitterOperator<Committable, ManifestCommittable>) committerOperator,
                        table);
    }
    SingleOutputStreamOperator<?> committed =
            written.transform(
                            GLOBAL_COMMITTER_NAME + " : " + table.name(),
                            new CommittableTypeInfo(),
                            committerOperator)
                    .setParallelism(1)
                    .setMaxParallelism(1);
    Options options = Options.fromMap(table.options());
    configureGlobalCommitter(
            committed,
            options.get(SINK_COMMITTER_CPU),
            options.get(SINK_COMMITTER_MEMORY),
            conf);
    return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}

Commit也是自定义实现了一个Operator用来提交元数据信息。最后返回了一个空的DiscardingSink做结束。

Catalog

Flink想要操作paimon的元数据需要自己实现catalog的接口

public interface Catalog {
    //此接口主要是用来将catalog与table connect链接起来需要返回table的工厂类
    default Optional<Factory> getFactory() {
        return Optional.empty();
    }

    /** @deprecated */
    @Deprecated
    default Optional<TableFactory> getTableFactory() {
        return Optional.empty();
    }

    default Optional<FunctionDefinitionFactory> getFunctionDefinitionFactory() {
        return Optional.empty();
    }

    void open() throws CatalogException;

    void close() throws CatalogException;

    String getDefaultDatabase() throws CatalogException;

    List<String> listDatabases() throws CatalogException;

    CatalogDatabase getDatabase(String var1) throws DatabaseNotExistException, CatalogException;

    boolean databaseExists(String var1) throws CatalogException;

    void createDatabase(String var1, CatalogDatabase var2, boolean var3) throws DatabaseAlreadyExistException, CatalogException;

    default void dropDatabase(String name, boolean ignoreIfNotExists) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
        this.dropDatabase(name, ignoreIfNotExists, false);
    }

    void dropDatabase(String var1, boolean var2, boolean var3) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException;

    void alterDatabase(String var1, CatalogDatabase var2, boolean var3) throws DatabaseNotExistException, CatalogException;

    List<String> listTables(String var1) throws DatabaseNotExistException, CatalogException;

    List<String> listViews(String var1) throws DatabaseNotExistException, CatalogException;

    CatalogBaseTable getTable(ObjectPath var1) throws TableNotExistException, CatalogException;

    boolean tableExists(ObjectPath var1) throws CatalogException;

    void dropTable(ObjectPath var1, boolean var2) throws TableNotExistException, CatalogException;

    void renameTable(ObjectPath var1, String var2, boolean var3) throws TableNotExistException, TableAlreadyExistException, CatalogException;

    void createTable(ObjectPath var1, CatalogBaseTable var2, boolean var3) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException;

    void alterTable(ObjectPath var1, CatalogBaseTable var2, boolean var3) throws TableNotExistException, CatalogException;

    List<CatalogPartitionSpec> listPartitions(ObjectPath var1) throws TableNotExistException, TableNotPartitionedException, CatalogException;

    List<CatalogPartitionSpec> listPartitions(ObjectPath var1, CatalogPartitionSpec var2) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException;

    List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath var1, List<Expression> var2) throws TableNotExistException, TableNotPartitionedException, CatalogException;

    CatalogPartition getPartition(ObjectPath var1, CatalogPartitionSpec var2) throws PartitionNotExistException, CatalogException;

    boolean partitionExists(ObjectPath var1, CatalogPartitionSpec var2) throws CatalogException;

    void createPartition(ObjectPath var1, CatalogPartitionSpec var2, CatalogPartition var3, boolean var4) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException;

    void dropPartition(ObjectPath var1, CatalogPartitionSpec var2, boolean var3) throws PartitionNotExistException, CatalogException;

    void alterPartition(ObjectPath var1, CatalogPartitionSpec var2, CatalogPartition var3, boolean var4) throws PartitionNotExistException, CatalogException;

    List<String> listFunctions(String var1) throws DatabaseNotExistException, CatalogException;

    CatalogFunction getFunction(ObjectPath var1) throws FunctionNotExistException, CatalogException;

    boolean functionExists(ObjectPath var1) throws CatalogException;

    void createFunction(ObjectPath var1, CatalogFunction var2, boolean var3) throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException;

    void alterFunction(ObjectPath var1, CatalogFunction var2, boolean var3) throws FunctionNotExistException, CatalogException;

    void dropFunction(ObjectPath var1, boolean var2) throws FunctionNotExistException, CatalogException;

    CatalogTableStatistics getTableStatistics(ObjectPath var1) throws TableNotExistException, CatalogException;

    CatalogColumnStatistics getTableColumnStatistics(ObjectPath var1) throws TableNotExistException, CatalogException;

    CatalogTableStatistics getPartitionStatistics(ObjectPath var1, CatalogPartitionSpec var2) throws PartitionNotExistException, CatalogException;

    CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath var1, CatalogPartitionSpec var2) throws PartitionNotExistException, CatalogException;

    void alterTableStatistics(ObjectPath var1, CatalogTableStatistics var2, boolean var3) throws TableNotExistException, CatalogException;

    void alterTableColumnStatistics(ObjectPath var1, CatalogColumnStatistics var2, boolean var3) throws TableNotExistException, CatalogException, TablePartitionedException;

    void alterPartitionStatistics(ObjectPath var1, CatalogPartitionSpec var2, CatalogTableStatistics var3, boolean var4) throws PartitionNotExistException, CatalogException;

    void alterPartitionColumnStatistics(ObjectPath var1, CatalogPartitionSpec var2, CatalogColumnStatistics var3, boolean var4) throws PartitionNotExistException, CatalogException;
}
public Optional<Factory> getFactory() {
    return Optional.of(new FlinkTableFactory());
}

写入

先写入内存,之后在flush到磁盘

Writer节点中Cp之前Flush到磁盘或者内存满了,返回元数据,用于发往下游

生成L0层的sort run,L0层每个文件一个sortrun,大于level0层一层一个sortrun,sortrun之间没有overlap重叠

合并

  1. Flush到磁盘前尝试压缩 cp 之前,或者内存满了
  2. 根据compaction strategy 找出要compact的sortrun

    1. fullcompaction

      1. 所有sortrun参与compaction
      2. 触发条件:changlogproducer:fullcompaction、批模式执行、相关配置、size放大
    2. universalCompaction

      • space 放大触发full-compaction
      • 由individual Size ratio触发minor compaction.Size(R2) / Size(R1) <= IndividualSizeRatio时,r1 r2都执行压缩,目的sr尽可能对齐。
      • num-sort-run.compaction-trogger触发minor compaction
    3. ForceUplevel0Compaction:

      • mergeEngine=first-row或changlogproducer=lookup或delevectors=true
      • compaction时强制把L0层所有文件进行rewrite(需要产生changlog,或者delevectors所以需要压缩L0层)
  3. 封装一个MergeTreeCompactTask(callable 返回compactResult)封装一个扔到线程池去执行(运行时上报metrics)
  4. 执行compactRewriter,并且按需生成changlog,生成delevectors(产出changlog会调用mergefunction对数据进行合并例如agg、firstrow、pu表等)

    1. 读数据时多路归并排序算法对应pip2
    2. changlogproducer=lookup|fullcompaction或delevectors=true 产出changlog

Consumerid+primarykey+fixed bucket

Source

MonitorSource

主要负责读取快照,下发datasplit到下游Operator。
snapshot时将快照id存储,之后在notifyComplete时将consumerid写入。
pollNext时会将splits(元数据信息)下发到下游readerOperator,通过readerBuilder.scan.plan.split。由于readbuilder在创建时已经将flink的各种filter已经传进来,因此在plan时会根据元数据里面的信息进行一大波的过滤。

ReadOperator

Datasplit下发到readerOperator之前会将根据partition+bucket信息进行shuffle确保相同分区+bucket的数据发向对应的ReaderOperator。

readerOperator主要是根据datasplit中信息读取真正的数据文件,以及上报一些metrcis(ps:SOURCE_IDLE_TIME是我实现的)在读取真正数据文件时也会根据一些过滤条件进行过滤,例如:fileindex或者delevectors。

Sink

RowDataStoreWriteOperator

主要将数据写入到磁盘,以及做compaction其中compaction中还有一些快照过期等操作。
写数据时,会首先写入writerbuffer中(此处调用sun.misc.Unsafe使用的堆外内存自己管理释放),如果buffer满了之后在flush到磁盘,另外在snapshotshot之前会强制flush到磁盘。写入数据时会根据不同的mergerfunction进行合并。写入changelog时会根据changlog producer,Input模式会直接写入,none不写入,lookup以及fullcompaction时会在compaction时写入数据。

CommitterOperator

主要将上游产出的元数据写入到磁盘。
在processElement时会首先将上游传来的数据存储到内存里Deque
在Snapshot时把内存中的所有 Committable 根据 CheckpointId 来做聚合成一些ManifestCommittable。之后存到state里面。
notifyComplete时,会将元数据信息提交到外部存储。在提交时还会有一些冲突检测,如果snapshot冲突(两个writer同时写)会抛弃这次写入,在写一次并且合并上次snapshot产出的元数据文件(paimon实现是一个while(true)的循环)。
File冲突,主要是因为compaction时标记量文件为逻辑删除,例如上个快照已经标记某个file文件为逻辑删除状态了,当前状态中又标记了某文件为逻辑删除,这会产生不一致,因此需要重启,抛弃这次的compaction快照。

概述

索引是用于加速数据查询和访问的重要数据结构,用户可以对某些字段设置索引,进行查询时可以加快检索的速度,当前Paimon 支持的索引有BloomFilter与Bitmap两种索引,当前0.9.0版本索引支持append only表结构,代码master分支已经支持了主键表。

File Index原理

文件索引本质上是在写入数据时格外针对进行索引的字段写一份用于索引的索引文件,然后在用户进行数据查询时根据用户指定的where 语句里面的匹配规则先根据索引文件进行一次过滤.
Paimon整体过滤流程如下:

读取

index1.jpg
Flink 在使用consumerid模式消费paimon时会产生两个节点,一个是monitor节点(读取元数据产出元数据)一个是reader节点(真正去并发读取数据),其中monitor主要是用来读取快照信息,并根据一些过滤条件对paimon的元数据进行过滤,最终产出split(经过一系列过滤产出的元数据信息)下发到reader的节点。reader节点在根据splits信息去真正到hdfs上读取数据。
Paimon的Index有两个地方可以存储一个是存储到Manifest中(当index文件较小时),另外是存储在datafile中(当index文件较大时)threshold是500bytes。(如果index特别大,放在monitor中读取有OOM风险)
我们可以看上图,通过索引进行过滤可以在scan时进行,也可以在真正读取数据时进行。Paimon在实现FlinkSource时继承实现了Flink的SupportsFilterPushDown会将filter信息传输给Paimon如下图:
index2.png
Index在元数据
Paimon获取到Flink传来的Filter之后会转换成自己的Predicate,之后逐级传输下去,最终在AbstractFileStoreScan&plan中对获取的元数据进行过滤,如下图在我圈出来的地方都会进行过滤,不只是根据index进行过滤Paimon之前也有些过滤Partition等等之类的优化也在这块。
index3.png
index在数据文件
读取数据文件相关逻辑在Flink的ReadOperater中,在processElement方法读取Split时对数据进行过滤。
index4.png

写入

写入就是在写数据之前向index文件中写一份。
index5.png
存储格式

  _____________________________________    _____________________
|     magic    |version|head length |
|-------------------------------------|
|            column number            |
|-------------------------------------|
|   column 1        | index number   |
|-------------------------------------|
|  index name 1 |start pos |length  |
|-------------------------------------|
|  index name 2 |start pos |length  |
|-------------------------------------|
|  index name 3 |start pos |length  |
|-------------------------------------|            HEAD
|   column 2        | index number   |
|-------------------------------------|
|  index name 1 |start pos |length  |
|-------------------------------------|
|  index name 2 |start pos |length  |
|-------------------------------------|
|  index name 3 |start pos |length  |
|-------------------------------------|
|                 ...                 |
|-------------------------------------|
|                 ...                 |
|-------------------------------------|
|  redundant length |redundant bytes |
|-------------------------------------|    ---------------------
|                BODY                 |
|                BODY                |
|                BODY                 |             BODY
|                BODY                 |
|_____________________________________|    _____________________

column number:索引列数量
column 1:被索引的某一列
index number:这一列设置的索引类型个数(支持某一列设置多个索引)
index name 1 |start pos |length :索引名,以及索引的开始位置,长度。
BODY:具体索引内容

Bloom Filter

使用方式
file-index.bloom-filter.columns配置,配置用户想要增加索引的字段。
file-index.bloom-filter.<column_name>.fpp配置此列允许的错误率 默认0.1
file-index.bloom-filter.<column_name>.items配置此列预期的非重复项 默认100000
限制

  1. 对于新建的表来说直接设置'file-index.bloom-filter.columns'字段就可以,对于旧表,需要回刷一下索引文件回刷接口CALL sys.rewrite_file_index(table => 'test_db.T'),并且写入旧表的任务重启下确保索引相关配置生效。
  2. 不支持的数据类型 Array、mutiset、map、row、decimal、boolean

原理
Bloom Filter主要是对于=进行过滤,当用户查询时select * from t where b=1;首先会查找bloom过滤中是否有这种数据 bloomfiter中如果没有此数据就直接返回空,如果有的话在进一步查寻具体的数据表。

Bitmap

使用方式
在配置中添加file-index.bitmap.columns
限制

  1. 对于新建的表来说直接设置'file-index.bitmap.columns'字段就可以,对于旧表,需要回刷一下索引文件CALL sys.rewrite_file_index(table => 'test_db.T'),并且写入旧表的任务重启下确保索引相关配置生效。
  2. 不支持的数据类型

    Map
    Row
    Mutiset
    Array
    Decimal
    Binary
    Varbinary
    

原理
Bitmap 索引 会将索引的字段存储到bitmap中,每次where 条件中有bitmap中的字段,会先在bitmap查询数据是否存在,如果存在的话在继续查询读取数据,如果不存在直接返回空,可以对in,=,not in等过滤。

概述
Deletion vector 是用来提升paimon表的读取性能的一种配置,是paimon0.8版本新增的一种机制。主要是通过在写入时生成deletion file(牺牲了部分写入性能),这样在读取数据时,可以根据deletion file对需要读取的数据进行过滤,这样就避免了不同文件的合并成本。文件存储在:manifest-index-manifest-5d670043-da25-4265-9a26-e31affc98039-0

使用方式
配置中指定'deletion-vectors.enabled' = 'true'。
使用限制
changlog-producer 需要被设置成none或者lookup
changlog-producer.lookup-wait 不能设置为false.
merge-engine不能是first-row
这个模式需要过滤过滤level-0层数据,因此在使用时间旅行读取APPEND快照时,将存在数据延迟。
实现
当用户对旧数据进行更新时,会产生一个delete file,此文件主要用来标识data file中哪一条数据被删除了。
paimon-deve.png
delete file文件的结构如下:
paimon-deve-2.png
一个bucket中一个delete file,delete file 中数据的存储结构为map<file_name, bitmap>,当读取指定的数据文件时,首先将delete file 读取进来根据file_name 构建一个bitmap,之后在读取数据文件并根据bitmap将数据过滤。
在compact时生成新的delete file并且标记旧的delete file删除。delete file依赖对应的快照。
JSON

{
  "org.apache.paimon.avro.generated.record": {
    "_VERSION": 1,
    "_KIND": 0,
    "_PARTITION": "\u0000\u0000\u0000\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p\u0000\u0000\u0000\u0000\u0000\u0000",
    "_BUCKET": 0,
    "_TYPE": "DELETION_VECTORS",
    "_FILE_NAME": "index-32f16270-5a81-4e5e-9f93-0e096b8b58d3-0",
    "_FILE_SIZE": x,
    "_ROW_COUNT": count of the map,
    "_DELETION_VECTORS_RANGES": "binary Map<String, Pair<Integer, Integer>>", key is the fileName, value is <start offset of the serialized bitmap in Index file, size of the serialized bitmap>
  }
}

Write
基于compact+lookup 的deleteFile的生成机制:

  1. 新数据被写入到level0层。
  2. 每次写入后执行压缩,并且强制合并level0层数据
  3. 实现一个类似LookupDeleteFileMergeFunctionWrapper的合并,它具有以下特征:
    a. 当记录不属于level0层时,不产生删除。
    b. 当记录属于level0层+level x层时,不生成删除。(compact时选中的被压缩的层 level 0 层与level x层,都有此数据)
    c. 当记录只属于level 0层时,查找其他层生成 deletefile的map(compact时选中的被压缩的层只有level 0层有此数据)
  4. compact结束后,旧文件中的bigmap不再使用,会生成一个新的文件里面有新的bigmap。

Read
Compaction的优化主要在于批式读取时候可以并发读取单个bucket的文件,之后借助deletefile将历史文件中的数据清除就可以。如果没有delefile,MOR读取方式需要单个并发度去读取一个bucket,因为需要做排序,去除历史的数据。由于deletefile是在compaction时产出,因此读取时候不能读取Level0层的文件结果。
示例:
paimon-deve-3.png
测试
本次测试只是针对数据准确性的测试,不再进行压测,官方压测结果可参考https://mp.weixin.qq.com/s/7VptRdZU6mGQlPiv4silEA
新增一些数据
SQL

insert into test_ver values(1,1);
insert into test_ver values(2,2);
insert into test_ver values(3,33);
update test_ver set age = 2 where (user_id=1 or user_id = 2);

查看生成的index file
JSON

{
        "org.apache.paimon.avro.generated.record": {
                "_VERSION": 1,
                "_KIND": 0,
                "_PARTITION": "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000",
                "_BUCKET": 0,
                "_INDEX_TYPE": "DELETION_VECTORS",
                "_FILE_NAME": "index-6d72d7bc-a6b8-4643-9e34-80bdb80ffd4c-0",
                "_FILE_SIZE": 33,
                "_ROW_COUNT": 1,
                "_DELETIONS_VECTORS_RANGES": {
                        "array": [{
                                "org.apache.paimon.avro.generated.record__DELETIONS_VECTORS_RANGES": {
                                        "f0": "data-f854406a-f936-449e-a66c-1c4e1446c5b1-0.orc",
                                        "f1": 1,
                                        "f2": 24
                                }
                        }]
                }
        }
}

注意
设置'deletion-vectors.enabled' = 'true',读取全量数据依赖compact快照,如果将表设置成write-only表并且没有启动compact任务,使用批查询,或者流式查询历史全量的datafile将查询不到数据。