在Flinksql 解析的文章中,我们了解了filter是如何从sql下发到paimon,本文我们介绍下paimon拿到这些filter是如何进行优化的。

Flink PushDown

Flink 下发到Paimon用于过滤的接口:

1.SuportsFilterPushDown用于下推where语句里面的filter,注意返回值是两个List,acceptedFilters表示Source节点可以使用的filter,remainingFilters表示不可以使用的filter。acceptedFilters可以帮助Flink优化执行计划,例如对某个值的过滤原来需要在DataStream里面做,但是Source节点在读取数据时就可以做这部分过滤了,Flink DataStream就不用生成相关节点了。

2.SupportsProjectionPushDown用于下发Projection也就是select里面选择的字段,当前paimon还未发布的1.0中已经支持了嵌套类型(row里面在套一层row)的下发了。此处过滤主要是用于读取Parquet文件或者orc文件时可以通过对应接口直接做Column读。

public interface SupportsFilterPushDown {
    Result applyFilters(List<ResolvedExpression> filters);
    final class Result {
        private final List<ResolvedExpression> acceptedFilters;
        private final List<ResolvedExpression> remainingFilters;

        private Result(
                List<ResolvedExpression> acceptedFilters,
                List<ResolvedExpression> remainingFilters) {
            this.acceptedFilters = acceptedFilters;
            this.remainingFilters = remainingFilters;
        }
    }
}

public interface SupportsProjectionPushDown {
    boolean supportsNestedProjection();
    @Deprecated
    default void applyProjection(int[][] projectedFields) {
      
    }
    default void applyProjection(int[][] projectedFields, DataType producedDataType) {
        applyProjection(projectedFields);
    }
}

Paimon 过滤优化

版本1.0还未发布的master分支

Paimon总体的过滤分为两部分:

1.读取元数据时根据元数据里面的一些统计信息,过滤出真正需要读取的Datasplit下发到下游。

2.读取数据文件时,根据File index、delete vector信息(row过滤)、projection信息(cloumn级别过滤)来过滤。

Reader Meta

代码MonitorSource(也就是consumer id方式流消费)

paimon的元数据里面存着很多统计信息,例如partition字段的max&&min&&nullCount,数据文件的key的max&&min&&nullCount还有value的max&&min&&nullCount。

paimon在FlinkSourceBuilder时,将相应的过滤信息存到ReaderBuilder里面

org.apache.paimon.flink.source.FlinkSourceBuilder#createReadBuilder
private ReadBuilder createReadBuilder(@Nullable org.apache.paimon.types.RowType readType) {
        ReadBuilder readBuilder = table.newReadBuilder();
        if (readType != null) {
            readBuilder.withReadType(readType); //此处就是projection
        }
        readBuilder.withFilter(predicate); //此处是sql 里面where的过滤信息
        if (limit != null) {
            readBuilder.withLimit(limit.intValue());//此处是limit信息
        }
        return readBuilder.dropStats();
    }

paimon真正开始读取元数据,过滤读取核心逻辑在plan里面

readBuilder.newStreamScan().plan().splits();

plan里面一层层点进去最终实现是在

SnapshotReaderImpl#read->AbstractFileStoreScan#plan

org.apache.paimon.operation.AbstractFileStoreScan#plan
    public Plan plan() {
        long started = System.nanoTime();
        ManifestsReader.Result manifestsResult = readManifests();
        Snapshot snapshot = manifestsResult.snapshot;
        List<ManifestFileMeta> manifests = manifestsResult.filteredManifests;
        Iterator<ManifestEntry> iterator = readManifestEntries(manifests, false);
        List<ManifestEntry> files = new ArrayList<>();
        while (iterator.hasNext()) {
            files.add(iterator.next());
        }
        if (wholeBucketFilterEnabled()) {
            files =
                    files.stream()
                            .collect(
                                    Collectors.groupingBy(
                                            file -> Pair.of(file.partition(), file.bucket()),
                                            LinkedHashMap::new,
                                            Collectors.toList()))
                            .values()
                            .stream()
                            .map(this::filterWholeBucketByStats)
                            .flatMap(Collection::stream)
                            .collect(Collectors.toList());
        }

        List<ManifestEntry> result = files;
        long scanDuration = (System.nanoTime() - started) / 1_000_000;
        if (scanMetrics != null) {
            long allDataFiles =
                    manifestsResult.allManifests.stream()
                            .mapToLong(f -> f.numAddedFiles() - f.numDeletedFiles())
                            .sum();
            scanMetrics.reportScan(
                    new ScanStats(
                            scanDuration,
                            manifests.size(),
                            allDataFiles - result.size(),
                            result.size()));
        }
    }

过滤逻辑都发生在上方部分,AbstractFileStoreScan还有两个子类分别是AppendOnlyFileStoreScan(appendOnly表)与KeyValueFileStoreScan(主键表)。

具体过滤逻辑就不在这展开写了,可以看到以上几个Scan都有不同的Filter的属性值

AbstractFileStoreScan
    private Filter<Integer> levelFilter = null;
    private Filter<ManifestEntry> manifestEntryFilter = null;
    private Filter<String> fileNameFilter = null;
    private ManifestCacheFilter manifestCacheFilter = null;
KeyValueFileStoreScan
    private Predicate keyFilter;
    private Predicate valueFilter;
AppendOnlyFileStoreScan
    private Predicate filter;

这些不同的过滤都是在newScan时被进一步拆开。

值得一提的是,fileindex的过滤也可能在ReadMeta时发生。当FileIndex数据文件较小时,会存储在元数据里面,具体可见FileIndex篇讲解。

Read DataFIle

代码ReaderOperator(consumer id方式读取的下一个flink节点)

readBuilder.newRead().createReader(split)

这个Split就是上游节点传来的数据文件的元信息,各种过滤都发生在createReader(split)里面。主要是delevectors(Row级别)过滤,fileindex过滤,各种文件存储直接读取column的接口。

其中FileIndex相关过滤、DeletionVector Reader的创建还有传下来的projection都在下面代码。

org.apache.paimon.operation.RawFileSplitRead#createFileReader
private FileRecordReader<InternalRow> createFileReader(
            BinaryRow partition,
            DataFileMeta file,
            DataFilePathFactory dataFilePathFactory,
            FormatReaderMapping formatReaderMapping,
            IOExceptionSupplier<DeletionVector> dvFactory)
            throws IOException {
        FileIndexResult fileIndexResult = null;
        if (fileIndexReadEnabled) {
            fileIndexResult =
                    FileIndexEvaluator.evaluate(
                            fileIO,
                            formatReaderMapping.getDataSchema(),
                            formatReaderMapping.getDataFilters(),
                            dataFilePathFactory,
                            file);//此处对FileIndex进行过滤
            if (!fileIndexResult.remain()) { 
                return new EmptyFileRecordReader<>();
            }
        }

        FormatReaderContext formatReaderContext =
                new FormatReaderContext(
                        fileIO, dataFilePathFactory.toPath(file), file.fileSize(), fileIndexResult);
        FileRecordReader<InternalRow> fileRecordReader =
                new DataFileRecordReader(
                        formatReaderMapping.getReaderFactory(),
                        formatReaderContext,
                        formatReaderMapping.getIndexMapping(),//projection过滤对应的readDataFields在这里
                        formatReaderMapping.getCastMapping(),
                        PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition));

        if (fileIndexResult instanceof BitmapIndexResult) {
            fileRecordReader =
                    new ApplyBitmapIndexRecordReader(
                            fileRecordReader, (BitmapIndexResult) fileIndexResult);
        }
//此处创建ApplyDeletionVectorReader
        DeletionVector deletionVector = dvFactory == null ? null : dvFactory.get();
        if (deletionVector != null && !deletionVector.isEmpty()) {
            return new ApplyDeletionVectorReader(fileRecordReader, deletionVector);
        }
        return fileRecordReader;
    }

标签: none

评论已关闭