lookup join

Flink lookup 同步与异步调用的最终实现都是调用FileStoreLookupFunction,区别也就是在包了一层对象。

public static LookupRuntimeProvider create(
            FileStoreLookupFunction function, boolean enableAsync, int asyncThreadNumber) {
        NewLookupFunction lookup = new NewLookupFunction(function);
        return enableAsync
                ? AsyncLookupFunctionProvider.of(
                        new AsyncLookupFunctionWrapper(lookup, asyncThreadNumber))
                : LookupFunctionProvider.of(lookup);
    }

点进去最终实现也就是在lookupfunction中调用LookupTable。

public interface LookupTable extends Closeable {

    void specificPartitionFilter(Predicate filter);

    void open() throws Exception;

    List<InternalRow> get(InternalRow key) throws IOException;

    void refresh() throws Exception;

    void specifyCacheRowFilter(Filter<InternalRow> filter);
}

其中lookupTable有多种不同的实现,下面会进一步讲。

先说下LookupFunction都会做什么

open:根据不同的配置创建不同的lookupTable,之后调用lookupTable的open方法。

lookup:首先尝试tryRefresh(用于刷新lookupTable信息),之后调用lookupTable.get(key)获取返回结果包装成FlinkRow之后返回

FullCacheLookupTable

FullCache lookuptable 顾名思义就是将整张paimon表的数据都load到本地,这样在加载数据的时候只需要在本地加载数据就好,性能最高,但是由于数据都加载到本地,对磁盘要求较大,初始加载慢。又根据是否为主键表以及join key是否为主键又分为以下三种:NoprimaryKeylookupTable、primaryKeyLookupTable、SecondaryIndexLookupTable。

FullCacheLookupTable主要是依赖rocksDb+cache实现查询加速。

NoPrimaryKeyLookupTable

略。

PrimaryKeyLookupTable

我们这里简要说一下在open时读取以及更新数据时读取paimon表的步骤

open

public void open() throws Exception {
        openStateFactory(); //创建RockDB工厂类
        createTableState(); //创建RocksDb,以及Cache
        bootstrap(); // 读取paimon表数据写入到RocksDb SST文件
}

在bootstrap中会创建LookupStreamingReader读取paimon表中数据之后通过BinaryExternalSortBuffer进行排序,写入到内存,满了之后写磁盘,之后在将数据读取出来写入到RocksDb的SST,主键就是paimon表的key。

protected void bootstrap() throws Exception {
        Predicate scanPredicate =
                PredicateBuilder.andNullable(context.tablePredicate, specificPartition);
        this.reader =
                new LookupStreamingReader(
                        context.table,
                        context.projection,
                        scanPredicate,
                        context.requiredCachedBucketIds,
                        cacheRowFilter);  //此处还将projection传递给了下游可用于读取数据时的优化
        BinaryExternalSortBuffer bulkLoadSorter =
                RocksDBState.createBulkLoadSorter(
                        IOManager.create(context.tempPath.toString()), context.table.coreOptions());
        Predicate predicate = projectedPredicate();
        try (RecordReaderIterator<InternalRow> batch =
                new RecordReaderIterator<>(reader.nextBatch(true))) {
            while (batch.hasNext()) {
                InternalRow row = batch.next();
                if (predicate == null || predicate.test(row)) {
                    bulkLoadSorter.write(GenericRow.of(toKeyBytes(row), toValueBytes(row))); //读取数据排序写入。
                }
            }
        }

        MutableObjectIterator<BinaryRow> keyIterator = bulkLoadSorter.sortedIterator();
        BinaryRow row = new BinaryRow(2);
        TableBulkLoader bulkLoader = createBulkLoader();
        try {
            while ((row = keyIterator.next(row)) != null) {
                bulkLoader.write(row.getBinary(0), row.getBinary(1));//写入到RocksDb的SST
            }
        } catch (BulkLoader.WriteException e) {
      
        }

        bulkLoader.finish();
        bulkLoadSorter.clear();
    }

lookup->tryRefresh()

lookup时会首先尝试tryRefresh,如果需要做refresh时,会读取paimon表后续的snapshot的元数据文件,之后在根据快照读取数据,将数据更新到RocksDb中,之后让Cache中的对应key失效。在实现上Fangyong大佬还实现了个异步更新,具体实现就是开了个线程池,然后提交doRefresh任务

 public void refresh() throws Exception {
        if (refreshExecutor == null) {
            doRefresh();
            return;
        }
...
            doRefresh();
        } else {
            Future<?> currentFuture = null;
            try {
                currentFuture =
                        refreshExecutor.submit(
                                () -> {
                                    try {
                                        doRefresh();
                                    }
                                });
            } 
            if (currentFuture != null) {
                refreshFuture = currentFuture;
            }
        }
    }

其中doResh

private void doRefresh() throws Exception {
        while (true) {
            try (RecordReaderIterator<InternalRow> batch =
                    new RecordReaderIterator<>(reader.nextBatch(false))) {//读取数据
                if (!batch.hasNext()) {
                    return;
                }
                refresh(batch);//更新rocksdb,失效cache
            }
        }
    }
//读取数据的详细实现就是扫描出datasplits,然后用户通过并发或非并发的方式读取,可以看到真正读取数据文件的时候,里面还可以加一些过滤的优化
public RecordReader<InternalRow> nextBatch(boolean useParallelism) throws Exception {
        List<Split> splits = scan.plan().splits();
        FunctionWithIOException<Split, RecordReader<InternalRow>> readerSupplier =
                split -> readBuilder.newRead().createReader(split);
        RecordReader<InternalRow> reader;
        if (useParallelism) {
            reader =
                    SplitsParallelReadUtil.parallelExecute(
                            readType,
                            readerSupplier,
                            splits,
                            options.pageSize(),
                            new Options(table.options()).get(LOOKUP_BOOTSTRAP_PARALLELISM));
        } else {
            List<ReaderSupplier<InternalRow>> readers = new ArrayList<>();
            for (Split split : splits) {
                readers.add(() -> readerSupplier.apply(split));
            }
            reader = ConcatRecordReader.create(readers);
        }
        if (projectedPredicate != null) {
            reader = reader.filter(projectedPredicate::test);//读取数据加projecttion的优化
        }

        if (cacheRowFilter != null) {
            reader = reader.filter(cacheRowFilter);
        }
        return reader;
    }

lookup->lookupTable.get(key)

这块逻辑主要是现在Cache中取出数据如果cache中没有就在RocksDb中取出,并存入cache

SecondaryIndexLookupTable

略。

PrimaryKeyPartialLookupTable

local Table

懒加载的方式,taskManager首先会将元数据加载进来,之后当需要lookup的数据进来之后,根据元数据信息找到需要读取的数据文件,在进行加载,返回结果。

remote Table

需要通过Procedure或者Action启动一个单独的Flink任务作为一个service服务。其他flink任务可以通过调用这个Service服务来返回结果。

标签: none

评论已关闭