Skip to content

Commit a70e655

Browse files
committed
feat: support runtime filter push down to Table AM
when seq scan begins, check whether the scanflags of table am is set to determine whether the runtime filter is pushed down. When the runtime filter is pushed down to pax am, pax am converts the min/max scankey in the runtime filter into PFTNode and performs min/max filtering.
1 parent 92e3374 commit a70e655

File tree

7 files changed

+82
-14
lines changed

7 files changed

+82
-14
lines changed

contrib/pax_storage/src/cpp/access/pax_access_handle.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,7 @@ uint32 PaxAccessMethod::ScanFlags(Relation relation) {
453453
flags |= SCAN_FORCE_BIG_WRITE_LOCK;
454454
#endif
455455

456+
flags |= SCAN_SUPPORT_RUNTIME_FILTER;
456457
return flags;
457458
}
458459

contrib/pax_storage/src/cpp/access/pax_scanner.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ bool PaxScanDesc::BitmapNextTuple(struct TBMIterateResult *tbmres,
218218
}
219219

220220
TableScanDesc PaxScanDesc::BeginScan(Relation relation, Snapshot snapshot,
221-
int nkeys, struct ScanKeyData * /*key*/,
221+
int nkeys, struct ScanKeyData *key,
222222
ParallelTableScanDesc pscan, uint32 flags,
223223
std::shared_ptr<PaxFilter> &&pax_filter,
224224
bool build_bitmap) {
@@ -326,8 +326,8 @@ void PaxScanDesc::EndScan() {
326326
}
327327

328328
TableScanDesc PaxScanDesc::BeginScanExtractColumns(
329-
Relation rel, Snapshot snapshot, int /*nkeys*/,
330-
struct ScanKeyData * /*key*/, ParallelTableScanDesc parallel_scan,
329+
Relation rel, Snapshot snapshot, int nkeys,
330+
struct ScanKeyData *key, ParallelTableScanDesc parallel_scan,
331331
struct PlanState *ps, uint32 flags) {
332332
std::shared_ptr<PaxFilter> filter;
333333
List *targetlist = ps->plan->targetlist;
@@ -361,7 +361,7 @@ TableScanDesc PaxScanDesc::BeginScanExtractColumns(
361361
filter->SetColumnProjection(std::move(col_bits));
362362

363363
if (pax_enable_sparse_filter) {
364-
filter->InitSparseFilter(rel, qual);
364+
filter->InitSparseFilter(rel, qual, key, nkeys);
365365

366366
// FIXME: enable predicate pushdown can filter rows immediately without
367367
// assigning all columns. But it may mess the filter orders for multiple
@@ -375,7 +375,7 @@ TableScanDesc PaxScanDesc::BeginScanExtractColumns(
375375
filter->InitRowFilter(rel, ps, filter->GetColumnProjection());
376376
}
377377
}
378-
return BeginScan(rel, snapshot, 0, nullptr, parallel_scan, flags,
378+
return BeginScan(rel, snapshot, nkeys, key, parallel_scan, flags,
379379
std::move(filter), build_bitmap);
380380
}
381381

contrib/pax_storage/src/cpp/storage/filter/pax_filter.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,12 @@ namespace pax {
4444
PaxFilter::PaxFilter() : sparse_filter_(nullptr), row_filter_(nullptr) {}
4545

4646
void PaxFilter::InitSparseFilter(Relation relation, List *quals,
47+
ScanKey key, int nkeys,
4748
bool allow_fallback_to_pg) {
4849
Assert(!sparse_filter_);
4950
sparse_filter_ =
5051
std::make_shared<PaxSparseFilter>(relation, allow_fallback_to_pg);
51-
sparse_filter_->Initialize(quals);
52+
sparse_filter_->Initialize(quals, key, nkeys);
5253
}
5354

5455
#ifdef VEC_BUILD

contrib/pax_storage/src/cpp/storage/filter/pax_filter.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class PaxFilter final {
5050
~PaxFilter() = default;
5151

5252
// The sparse filter
53-
void InitSparseFilter(Relation relation, List *quals,
53+
void InitSparseFilter(Relation relation, List *quals, ScanKey key, int nkeys,
5454
bool allow_fallback_to_pg = false);
5555
#ifdef VEC_BUILD
5656
void InitSparseFilter(

contrib/pax_storage/src/cpp/storage/filter/pax_sparse_filter.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class PaxSparseFilter final {
6565

6666
bool ExistsFilterPath() const;
6767

68-
void Initialize(List *quals);
68+
void Initialize(List *quals, ScanKey key, int nkeys);
6969

7070
#ifdef VEC_BUILD
7171
void Initialize(
@@ -83,6 +83,8 @@ class PaxSparseFilter final {
8383
private:
8484
#endif
8585

86+
std::shared_ptr<PFTNode> ProcessScanKey(ScanKey key);
87+
8688
// Used to build the filter tree with the PG quals
8789
std::shared_ptr<PFTNode> ExprWalker(Expr *expr);
8890
Expr *ExprFlatVar(Expr *expr);

contrib/pax_storage/src/cpp/storage/filter/pax_sparse_pg_path.cc

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,15 @@
3636

3737
namespace pax {
3838

39-
void PaxSparseFilter::Initialize(List *quals) {
39+
void PaxSparseFilter::Initialize(List *quals, ScanKey key, int nkeys) {
4040
ListCell *qual_cell;
4141
std::vector<std::shared_ptr<PFTNode>> fl_nodes; /* first level nodes */
4242
std::string origin_tree_str;
4343

4444
// no inited
4545
Assert(!filter_tree_);
4646

47-
if (!quals) {
47+
if (!quals && nkeys == 0) {
4848
return;
4949
}
5050

@@ -57,6 +57,23 @@ void PaxSparseFilter::Initialize(List *quals) {
5757
fl_nodes.emplace_back(std::move(fl_node));
5858
}
5959

60+
// walk scan key and only support min/max filter now
61+
for (int i = 0; i < nkeys; i++) {
62+
// TODO: support bloom filter in PaxFilter
63+
// but now just skip it, SeqNext() will check bloom filter in PassByBloomFilter()
64+
if (key[i].sk_flags & SK_BLOOM_FILTER) {
65+
continue;
66+
}
67+
68+
if (key[i].sk_strategy != BTGreaterEqualStrategyNumber &&
69+
key[i].sk_strategy != BTLessEqualStrategyNumber) {
70+
continue;
71+
}
72+
std::shared_ptr<PFTNode> fl_node = ProcessScanKey(&key[i]);
73+
Assert(fl_node);
74+
fl_nodes.emplace_back(std::move(fl_node));
75+
}
76+
6077
// build the root of `filter_tree_`
6178
BuildPFTRoot(fl_nodes);
6279
if (pax_log_filter_tree) origin_tree_str = DebugString();
@@ -67,6 +84,47 @@ void PaxSparseFilter::Initialize(List *quals) {
6784
origin_tree_str.c_str(), DebugString().c_str());
6885
}
6986

87+
std::shared_ptr<PFTNode> PaxSparseFilter::ProcessScanKey(ScanKey key) {
88+
std::shared_ptr<PFTNode> node = nullptr;
89+
Assert(key);
90+
Assert(!(key->sk_flags & SK_BLOOM_FILTER));
91+
Assert(key->sk_strategy == BTGreaterEqualStrategyNumber ||
92+
key->sk_strategy == BTLessEqualStrategyNumber);
93+
Assert(key->sk_attno > 0 &&
94+
key->sk_attno <= RelationGetNumberOfAttributes(rel_));
95+
96+
AttrNumber attno = key->sk_attno;
97+
98+
// Build VarNode on the left
99+
auto var_node = std::make_shared<VarNode>();
100+
var_node->attrno = attno;
101+
102+
// Build ConstNode on the right from ScanKey
103+
auto const_node = std::make_shared<ConstNode>();
104+
const_node->const_val = key->sk_argument;
105+
const_node->const_type = key->sk_subtype;
106+
if (key->sk_flags & SK_ISNULL) {
107+
const_node->sk_flags |= SK_ISNULL;
108+
}
109+
110+
// Build OpNode and attach children: (var, const)
111+
auto op_node = std::make_shared<OpNode>();
112+
op_node->strategy = key->sk_strategy;
113+
op_node->collation = key->sk_collation; // may be InvalidOid; executor will
114+
// fallback to attr collation
115+
116+
// Set operand types
117+
Form_pg_attribute attr = TupleDescAttr(RelationGetDescr(rel_), attno - 1);
118+
op_node->left_typid = attr->atttypid;
119+
op_node->right_typid = key->sk_subtype;
120+
121+
PFTNode::AppendSubNode(op_node, std::move(var_node));
122+
PFTNode::AppendSubNode(op_node, std::move(const_node));
123+
124+
node = op_node;
125+
return node;
126+
}
127+
70128
Expr *PaxSparseFilter::ExprFlatVar(Expr *clause) {
71129
Expr *flat_clause = clause;
72130
if (unlikely(!clause)) {

src/backend/executor/nodeSeqscan.c

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,18 @@ SeqNext(SeqScanState *node)
8080
* node->filter_in_seqscan is false means scankey need to be pushed to
8181
* AM.
8282
*/
83-
if (gp_enable_runtime_filter_pushdown && !node->filter_in_seqscan)
83+
if (gp_enable_runtime_filter_pushdown && node->filters &&
84+
(table_scan_flags(node->ss.ss_currentRelation) &
85+
(SCAN_SUPPORT_RUNTIME_FILTER)))
86+
{
87+
// pushdown runtime filter to AM
8488
keys = ScanKeyListToArray(node->filters, &nkeys);
89+
}
8590

8691
/*
87-
* We reach here if the scan is not parallel, or if we're serially
88-
* executing a scan that was planned to be parallel.
89-
*/
92+
* We reach here if the scan is not parallel, or if we're serially
93+
* executing a scan that was planned to be parallel.
94+
*/
9095
scandesc = table_beginscan_es(node->ss.ss_currentRelation,
9196
estate->es_snapshot,
9297
nkeys, keys,
@@ -102,6 +107,7 @@ SeqNext(SeqScanState *node)
102107
{
103108
while (table_scan_getnextslot(scandesc, direction, slot))
104109
{
110+
// TODO: later pushdown bloom filter to AM
105111
if (!PassByBloomFilter(&node->ss.ps, node->filters, slot))
106112
continue;
107113

0 commit comments

Comments
 (0)