Apache Paimon-lookup join实现
lookup join
Flink lookup 同步与异步调用的最终实现都是调用FileStoreLookupFunction,区别也就是在包了一层对象。
public static LookupRuntimeProvider create(
FileStoreLookupFunction function, boolean enableAsync, int asyncThreadNumber) {
NewLookupFunction lookup = new NewLookupFunction(function);
return enableAsync
? AsyncLookupFunctionProvider.of(
new AsyncLookupFunctionWrapper(lookup, asyncThreadNumber))
: LookupFunctionProvider.of(lookup);
}
点进去最终实现也就是在lookupfunction中调用LookupTable。
public interface LookupTable extends Closeable {
void specificPartitionFilter(Predicate filter);
void open() throws Exception;
List<InternalRow> get(InternalRow key) throws IOException;
void refresh() throws Exception;
void specifyCacheRowFilter(Filter<InternalRow> filter);
}
其中lookupTable有多种不同的实现,下面会进一步讲。
先说下LookupFunction都会做什么
open:根据不同的配置创建不同的lookupTable,之后调用lookupTable的open方法。
lookup:首先尝试tryRefresh(用于刷新lookupTable信息),之后调用lookupTable.get(key)获取返回结果包装成FlinkRow之后返回
FullCacheLookupTable
FullCache lookuptable 顾名思义就是将整张paimon表的数据都load到本地,这样在加载数据的时候只需要在本地加载数据就好,性能最高,但是由于数据都加载到本地,对磁盘要求较大,初始加载慢。又根据是否为主键表以及join key是否为主键又分为以下三种:NoprimaryKeylookupTable、primaryKeyLookupTable、SecondaryIndexLookupTable。
FullCacheLookupTable主要是依赖rocksDb+cache实现查询加速。
NoPrimaryKeyLookupTable
略。
PrimaryKeyLookupTable
我们这里简要说一下在open时读取以及更新数据时读取paimon表的步骤
open
public void open() throws Exception {
openStateFactory(); //创建RockDB工厂类
createTableState(); //创建RocksDb,以及Cache
bootstrap(); // 读取paimon表数据写入到RocksDb SST文件
}
在bootstrap中会创建LookupStreamingReader读取paimon表中数据之后通过BinaryExternalSortBuffer进行排序,写入到内存,满了之后写磁盘,之后在将数据读取出来写入到RocksDb的SST,主键就是paimon表的key。
protected void bootstrap() throws Exception {
Predicate scanPredicate =
PredicateBuilder.andNullable(context.tablePredicate, specificPartition);
this.reader =
new LookupStreamingReader(
context.table,
context.projection,
scanPredicate,
context.requiredCachedBucketIds,
cacheRowFilter); //此处还将projection传递给了下游可用于读取数据时的优化
BinaryExternalSortBuffer bulkLoadSorter =
RocksDBState.createBulkLoadSorter(
IOManager.create(context.tempPath.toString()), context.table.coreOptions());
Predicate predicate = projectedPredicate();
try (RecordReaderIterator<InternalRow> batch =
new RecordReaderIterator<>(reader.nextBatch(true))) {
while (batch.hasNext()) {
InternalRow row = batch.next();
if (predicate == null || predicate.test(row)) {
bulkLoadSorter.write(GenericRow.of(toKeyBytes(row), toValueBytes(row))); //读取数据排序写入。
}
}
}
MutableObjectIterator<BinaryRow> keyIterator = bulkLoadSorter.sortedIterator();
BinaryRow row = new BinaryRow(2);
TableBulkLoader bulkLoader = createBulkLoader();
try {
while ((row = keyIterator.next(row)) != null) {
bulkLoader.write(row.getBinary(0), row.getBinary(1));//写入到RocksDb的SST
}
} catch (BulkLoader.WriteException e) {
}
bulkLoader.finish();
bulkLoadSorter.clear();
}
lookup->tryRefresh()
lookup时会首先尝试tryRefresh,如果需要做refresh时,会读取paimon表后续的snapshot的元数据文件,之后在根据快照读取数据,将数据更新到RocksDb中,之后让Cache中的对应key失效。在实现上Fangyong大佬还实现了个异步更新,具体实现就是开了个线程池,然后提交doRefresh任务
public void refresh() throws Exception {
if (refreshExecutor == null) {
doRefresh();
return;
}
...
doRefresh();
} else {
Future<?> currentFuture = null;
try {
currentFuture =
refreshExecutor.submit(
() -> {
try {
doRefresh();
}
});
}
if (currentFuture != null) {
refreshFuture = currentFuture;
}
}
}
其中doResh
private void doRefresh() throws Exception {
while (true) {
try (RecordReaderIterator<InternalRow> batch =
new RecordReaderIterator<>(reader.nextBatch(false))) {//读取数据
if (!batch.hasNext()) {
return;
}
refresh(batch);//更新rocksdb,失效cache
}
}
}
//读取数据的详细实现就是扫描出datasplits,然后用户通过并发或非并发的方式读取,可以看到真正读取数据文件的时候,里面还可以加一些过滤的优化
public RecordReader<InternalRow> nextBatch(boolean useParallelism) throws Exception {
List<Split> splits = scan.plan().splits();
FunctionWithIOException<Split, RecordReader<InternalRow>> readerSupplier =
split -> readBuilder.newRead().createReader(split);
RecordReader<InternalRow> reader;
if (useParallelism) {
reader =
SplitsParallelReadUtil.parallelExecute(
readType,
readerSupplier,
splits,
options.pageSize(),
new Options(table.options()).get(LOOKUP_BOOTSTRAP_PARALLELISM));
} else {
List<ReaderSupplier<InternalRow>> readers = new ArrayList<>();
for (Split split : splits) {
readers.add(() -> readerSupplier.apply(split));
}
reader = ConcatRecordReader.create(readers);
}
if (projectedPredicate != null) {
reader = reader.filter(projectedPredicate::test);//读取数据加projecttion的优化
}
if (cacheRowFilter != null) {
reader = reader.filter(cacheRowFilter);
}
return reader;
}
lookup->lookupTable.get(key)
这块逻辑主要是现在Cache中取出数据如果cache中没有就在RocksDb中取出,并存入cache
SecondaryIndexLookupTable
略。
PrimaryKeyPartialLookupTable
local Table
懒加载的方式,taskManager首先会将元数据加载进来,之后当需要lookup的数据进来之后,根据元数据信息找到需要读取的数据文件,在进行加载,返回结果。
remote Table
需要通过Procedure或者Action启动一个单独的Flink任务作为一个service服务。其他flink任务可以通过调用这个Service服务来返回结果。
评论已关闭