Apache Paimon-Filter实现
在Flinksql 解析的文章中,我们了解了filter是如何从sql下发到paimon,本文我们介绍下paimon拿到这些filter是如何进行优化的。
Flink PushDown
Flink 下发到Paimon用于过滤的接口:
1.SuportsFilterPushDown用于下推where语句里面的filter,注意返回值是两个List,acceptedFilters表示Source节点可以使用的filter,remainingFilters表示不可以使用的filter。acceptedFilters可以帮助Flink优化执行计划,例如对某个值的过滤原来需要在DataStream里面做,但是Source节点在读取数据时就可以做这部分过滤了,Flink DataStream就不用生成相关节点了。
2.SupportsProjectionPushDown用于下发Projection也就是select里面选择的字段,当前paimon还未发布的1.0中已经支持了嵌套类型(row里面在套一层row)的下发了。此处过滤主要是用于读取Parquet文件或者orc文件时可以通过对应接口直接做Column读。
public interface SupportsFilterPushDown {
Result applyFilters(List<ResolvedExpression> filters);
final class Result {
private final List<ResolvedExpression> acceptedFilters;
private final List<ResolvedExpression> remainingFilters;
private Result(
List<ResolvedExpression> acceptedFilters,
List<ResolvedExpression> remainingFilters) {
this.acceptedFilters = acceptedFilters;
this.remainingFilters = remainingFilters;
}
}
}
public interface SupportsProjectionPushDown {
boolean supportsNestedProjection();
@Deprecated
default void applyProjection(int[][] projectedFields) {
}
default void applyProjection(int[][] projectedFields, DataType producedDataType) {
applyProjection(projectedFields);
}
}
Paimon 过滤优化
版本1.0还未发布的master分支
Paimon总体的过滤分为两部分:
1.读取元数据时根据元数据里面的一些统计信息,过滤出真正需要读取的Datasplit下发到下游。
2.读取数据文件时,根据File index、delete vector信息(row过滤)、projection信息(cloumn级别过滤)来过滤。
Reader Meta
代码MonitorSource(也就是consumer id方式流消费)
paimon的元数据里面存着很多统计信息,例如partition字段的max&&min&&nullCount,数据文件的key的max&&min&&nullCount还有value的max&&min&&nullCount。
paimon在FlinkSourceBuilder时,将相应的过滤信息存到ReaderBuilder里面
org.apache.paimon.flink.source.FlinkSourceBuilder#createReadBuilder
private ReadBuilder createReadBuilder(@Nullable org.apache.paimon.types.RowType readType) {
ReadBuilder readBuilder = table.newReadBuilder();
if (readType != null) {
readBuilder.withReadType(readType); //此处就是projection
}
readBuilder.withFilter(predicate); //此处是sql 里面where的过滤信息
if (limit != null) {
readBuilder.withLimit(limit.intValue());//此处是limit信息
}
return readBuilder.dropStats();
}
paimon真正开始读取元数据,过滤读取核心逻辑在plan里面
readBuilder.newStreamScan().plan().splits();
plan里面一层层点进去最终实现是在
SnapshotReaderImpl#read->AbstractFileStoreScan#plan
org.apache.paimon.operation.AbstractFileStoreScan#plan
public Plan plan() {
long started = System.nanoTime();
ManifestsReader.Result manifestsResult = readManifests();
Snapshot snapshot = manifestsResult.snapshot;
List<ManifestFileMeta> manifests = manifestsResult.filteredManifests;
Iterator<ManifestEntry> iterator = readManifestEntries(manifests, false);
List<ManifestEntry> files = new ArrayList<>();
while (iterator.hasNext()) {
files.add(iterator.next());
}
if (wholeBucketFilterEnabled()) {
files =
files.stream()
.collect(
Collectors.groupingBy(
file -> Pair.of(file.partition(), file.bucket()),
LinkedHashMap::new,
Collectors.toList()))
.values()
.stream()
.map(this::filterWholeBucketByStats)
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
List<ManifestEntry> result = files;
long scanDuration = (System.nanoTime() - started) / 1_000_000;
if (scanMetrics != null) {
long allDataFiles =
manifestsResult.allManifests.stream()
.mapToLong(f -> f.numAddedFiles() - f.numDeletedFiles())
.sum();
scanMetrics.reportScan(
new ScanStats(
scanDuration,
manifests.size(),
allDataFiles - result.size(),
result.size()));
}
}
过滤逻辑都发生在上方部分,AbstractFileStoreScan还有两个子类分别是AppendOnlyFileStoreScan(appendOnly表)与KeyValueFileStoreScan(主键表)。
具体过滤逻辑就不在这展开写了,可以看到以上几个Scan都有不同的Filter的属性值
AbstractFileStoreScan
private Filter<Integer> levelFilter = null;
private Filter<ManifestEntry> manifestEntryFilter = null;
private Filter<String> fileNameFilter = null;
private ManifestCacheFilter manifestCacheFilter = null;
KeyValueFileStoreScan
private Predicate keyFilter;
private Predicate valueFilter;
AppendOnlyFileStoreScan
private Predicate filter;
这些不同的过滤都是在newScan时被进一步拆开。
值得一提的是,fileindex的过滤也可能在ReadMeta时发生。当FileIndex数据文件较小时,会存储在元数据里面,具体可见FileIndex篇讲解。
Read DataFIle
代码ReaderOperator(consumer id方式读取的下一个flink节点)
readBuilder.newRead().createReader(split)
这个Split就是上游节点传来的数据文件的元信息,各种过滤都发生在createReader(split)里面。主要是delevectors(Row级别)过滤,fileindex过滤,各种文件存储直接读取column的接口。
其中FileIndex相关过滤、DeletionVector Reader的创建还有传下来的projection都在下面代码。
org.apache.paimon.operation.RawFileSplitRead#createFileReader
private FileRecordReader<InternalRow> createFileReader(
BinaryRow partition,
DataFileMeta file,
DataFilePathFactory dataFilePathFactory,
FormatReaderMapping formatReaderMapping,
IOExceptionSupplier<DeletionVector> dvFactory)
throws IOException {
FileIndexResult fileIndexResult = null;
if (fileIndexReadEnabled) {
fileIndexResult =
FileIndexEvaluator.evaluate(
fileIO,
formatReaderMapping.getDataSchema(),
formatReaderMapping.getDataFilters(),
dataFilePathFactory,
file);//此处对FileIndex进行过滤
if (!fileIndexResult.remain()) {
return new EmptyFileRecordReader<>();
}
}
FormatReaderContext formatReaderContext =
new FormatReaderContext(
fileIO, dataFilePathFactory.toPath(file), file.fileSize(), fileIndexResult);
FileRecordReader<InternalRow> fileRecordReader =
new DataFileRecordReader(
formatReaderMapping.getReaderFactory(),
formatReaderContext,
formatReaderMapping.getIndexMapping(),//projection过滤对应的readDataFields在这里
formatReaderMapping.getCastMapping(),
PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition));
if (fileIndexResult instanceof BitmapIndexResult) {
fileRecordReader =
new ApplyBitmapIndexRecordReader(
fileRecordReader, (BitmapIndexResult) fileIndexResult);
}
//此处创建ApplyDeletionVectorReader
DeletionVector deletionVector = dvFactory == null ? null : dvFactory.get();
if (deletionVector != null && !deletionVector.isEmpty()) {
return new ApplyDeletionVectorReader(fileRecordReader, deletionVector);
}
return fileRecordReader;
}
评论已关闭