Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions contrib/pax_storage/src/cpp/access/pax_access_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ uint32 PaxAccessMethod::ScanFlags(Relation relation) {
flags |= SCAN_FORCE_BIG_WRITE_LOCK;
#endif

flags |= SCAN_SUPPORT_RUNTIME_FILTER;
return flags;
}

Expand Down
10 changes: 5 additions & 5 deletions contrib/pax_storage/src/cpp/access/pax_scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ bool PaxScanDesc::BitmapNextTuple(struct TBMIterateResult *tbmres,
}

TableScanDesc PaxScanDesc::BeginScan(Relation relation, Snapshot snapshot,
int nkeys, struct ScanKeyData * /*key*/,
int nkeys, struct ScanKeyData *key,
ParallelTableScanDesc pscan, uint32 flags,
std::shared_ptr<PaxFilter> &&pax_filter,
bool build_bitmap) {
Expand Down Expand Up @@ -326,8 +326,8 @@ void PaxScanDesc::EndScan() {
}

TableScanDesc PaxScanDesc::BeginScanExtractColumns(
Relation rel, Snapshot snapshot, int /*nkeys*/,
struct ScanKeyData * /*key*/, ParallelTableScanDesc parallel_scan,
Relation rel, Snapshot snapshot, int nkeys,
struct ScanKeyData *key, ParallelTableScanDesc parallel_scan,
struct PlanState *ps, uint32 flags) {
std::shared_ptr<PaxFilter> filter;
List *targetlist = ps->plan->targetlist;
Expand Down Expand Up @@ -361,7 +361,7 @@ TableScanDesc PaxScanDesc::BeginScanExtractColumns(
filter->SetColumnProjection(std::move(col_bits));

if (pax_enable_sparse_filter) {
filter->InitSparseFilter(rel, qual);
filter->InitSparseFilter(rel, qual, key, nkeys);

// FIXME: enable predicate pushdown can filter rows immediately without
// assigning all columns. But it may mess the filter orders for multiple
Expand All @@ -375,7 +375,7 @@ TableScanDesc PaxScanDesc::BeginScanExtractColumns(
filter->InitRowFilter(rel, ps, filter->GetColumnProjection());
}
}
return BeginScan(rel, snapshot, 0, nullptr, parallel_scan, flags,
return BeginScan(rel, snapshot, nkeys, key, parallel_scan, flags,
std::move(filter), build_bitmap);
}

Expand Down
3 changes: 2 additions & 1 deletion contrib/pax_storage/src/cpp/storage/filter/pax_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ namespace pax {
PaxFilter::PaxFilter() : sparse_filter_(nullptr), row_filter_(nullptr) {}

void PaxFilter::InitSparseFilter(Relation relation, List *quals,
ScanKey key, int nkeys,
bool allow_fallback_to_pg) {
Assert(!sparse_filter_);
sparse_filter_ =
std::make_shared<PaxSparseFilter>(relation, allow_fallback_to_pg);
sparse_filter_->Initialize(quals);
sparse_filter_->Initialize(quals, key, nkeys);
}

#ifdef VEC_BUILD
Expand Down
2 changes: 1 addition & 1 deletion contrib/pax_storage/src/cpp/storage/filter/pax_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class PaxFilter final {
~PaxFilter() = default;

// The sparse filter
void InitSparseFilter(Relation relation, List *quals,
void InitSparseFilter(Relation relation, List *quals, ScanKey key, int nkeys,
bool allow_fallback_to_pg = false);
#ifdef VEC_BUILD
void InitSparseFilter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class PaxSparseFilter final {

bool ExistsFilterPath() const;

void Initialize(List *quals);
void Initialize(List *quals, ScanKey key, int nkeys);

#ifdef VEC_BUILD
void Initialize(
Expand All @@ -83,6 +83,8 @@ class PaxSparseFilter final {
private:
#endif

std::shared_ptr<PFTNode> ProcessScanKey(ScanKey key);

// Used to build the filter tree with the PG quals
std::shared_ptr<PFTNode> ExprWalker(Expr *expr);
Expr *ExprFlatVar(Expr *expr);
Expand Down
62 changes: 60 additions & 2 deletions contrib/pax_storage/src/cpp/storage/filter/pax_sparse_pg_path.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,35 @@

namespace pax {

void PaxSparseFilter::Initialize(List *quals) {
void PaxSparseFilter::Initialize(List *quals, ScanKey key, int nkeys) {
ListCell *qual_cell;
std::vector<std::shared_ptr<PFTNode>> fl_nodes; /* first level nodes */
std::string origin_tree_str;

// no inited
Assert(!filter_tree_);

if (!quals) {
if (!quals && nkeys == 0) {
return;
}

// walk scan key and only support min/max filter now
for (int i = 0; i < nkeys; i++) {
// TODO: support bloom filter in PaxFilter
// but now just skip it, SeqNext() will check bloom filter in PassByBloomFilter()
if (key[i].sk_flags & SK_BLOOM_FILTER) {
continue;
}

if (key[i].sk_strategy != BTGreaterEqualStrategyNumber &&
key[i].sk_strategy != BTLessEqualStrategyNumber) {
continue;
}
std::shared_ptr<PFTNode> fl_node = ProcessScanKey(&key[i]);
Assert(fl_node);
fl_nodes.emplace_back(std::move(fl_node));
}

foreach (qual_cell, quals) {
Expr *fl_clause = (Expr *)lfirst(qual_cell);
std::shared_ptr<PFTNode> fl_node = ExprWalker(fl_clause);
Expand All @@ -67,6 +84,47 @@ void PaxSparseFilter::Initialize(List *quals) {
origin_tree_str.c_str(), DebugString().c_str());
}

std::shared_ptr<PFTNode> PaxSparseFilter::ProcessScanKey(ScanKey key) {
std::shared_ptr<PFTNode> node = nullptr;
Assert(key);
Assert(!(key->sk_flags & SK_BLOOM_FILTER));
Assert(key->sk_strategy == BTGreaterEqualStrategyNumber ||
key->sk_strategy == BTLessEqualStrategyNumber);
Assert(key->sk_attno > 0 &&
key->sk_attno <= RelationGetNumberOfAttributes(rel_));

AttrNumber attno = key->sk_attno;

// Build VarNode on the left
auto var_node = std::make_shared<VarNode>();
var_node->attrno = attno;

// Build ConstNode on the right from ScanKey
auto const_node = std::make_shared<ConstNode>();
const_node->const_val = key->sk_argument;
const_node->const_type = key->sk_subtype;
if (key->sk_flags & SK_ISNULL) {
const_node->sk_flags |= SK_ISNULL;
}

// Build OpNode and attach children: (var, const)
auto op_node = std::make_shared<OpNode>();
op_node->strategy = key->sk_strategy;
op_node->collation = key->sk_collation; // may be InvalidOid; executor will
// fallback to attr collation

// Set operand types
Form_pg_attribute attr = TupleDescAttr(RelationGetDescr(rel_), attno - 1);
op_node->left_typid = attr->atttypid;
op_node->right_typid = key->sk_subtype;

PFTNode::AppendSubNode(op_node, std::move(var_node));
PFTNode::AppendSubNode(op_node, std::move(const_node));

node = op_node;
return node;
}

Expr *PaxSparseFilter::ExprFlatVar(Expr *clause) {
Expr *flat_clause = clause;
if (unlikely(!clause)) {
Expand Down
14 changes: 10 additions & 4 deletions src/backend/executor/nodeSeqscan.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,18 @@ SeqNext(SeqScanState *node)
* node->filter_in_seqscan is false means scankey need to be pushed to
* AM.
*/
if (gp_enable_runtime_filter_pushdown && !node->filter_in_seqscan)
if (gp_enable_runtime_filter_pushdown && node->filter_in_seqscan && node->filters &&
(table_scan_flags(node->ss.ss_currentRelation) &
(SCAN_SUPPORT_RUNTIME_FILTER)))
{
// pushdown runtime filter to AM
keys = ScanKeyListToArray(node->filters, &nkeys);
}

/*
* We reach here if the scan is not parallel, or if we're serially
* executing a scan that was planned to be parallel.
*/
* We reach here if the scan is not parallel, or if we're serially
* executing a scan that was planned to be parallel.
*/
scandesc = table_beginscan_es(node->ss.ss_currentRelation,
estate->es_snapshot,
nkeys, keys,
Expand All @@ -102,6 +107,7 @@ SeqNext(SeqScanState *node)
{
while (table_scan_getnextslot(scandesc, direction, slot))
{
// TODO: later pushdown bloom filter to AM
if (!PassByBloomFilter(&node->ss.ps, node->filters, slot))
continue;

Expand Down
Loading