Skip to content

Commit 1fb74ea

Browse files
committed
feat: pushdown bloom filter to pax table am
This optimization pushes down Bloom Filter conditions for runtime filters to the Pax Table AM layer. By applying the filter earlier than the SeqNext() function, it eliminates the overhead of converting data from columnar format to TableTupleSlot, resulting in faster query execution
1 parent a70e655 commit 1fb74ea

File tree

16 files changed

+808
-61
lines changed

16 files changed

+808
-61
lines changed

contrib/pax_storage/src/cpp/access/pax_scanner.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ TableScanDesc PaxScanDesc::BeginScanExtractColumns(
372372
&& !(flags & SO_TYPE_VECTOR)
373373
#endif
374374
) {
375-
filter->InitRowFilter(rel, ps, filter->GetColumnProjection());
375+
filter->InitRowFilter(rel, ps, filter->GetColumnProjection(), key, nkeys);
376376
}
377377
}
378378
return BeginScan(rel, snapshot, nkeys, key, parallel_scan, flags,

contrib/pax_storage/src/cpp/comm/cbdb_api.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ extern "C" {
8080
#include "commands/progress.h"
8181
#include "commands/tablecmds.h"
8282
#include "funcapi.h"
83+
#include "lib/bloomfilter.h"
8384
#include "miscadmin.h"
8485
#include "nodes/bitmapset.h"
8586
#include "nodes/execnodes.h"

contrib/pax_storage/src/cpp/storage/filter/pax_filter.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,8 @@ namespace pax {
4343

4444
PaxFilter::PaxFilter() : sparse_filter_(nullptr), row_filter_(nullptr) {}
4545

46-
void PaxFilter::InitSparseFilter(Relation relation, List *quals,
47-
ScanKey key, int nkeys,
48-
bool allow_fallback_to_pg) {
46+
void PaxFilter::InitSparseFilter(Relation relation, List *quals, ScanKey key,
47+
int nkeys, bool allow_fallback_to_pg) {
4948
Assert(!sparse_filter_);
5049
sparse_filter_ =
5150
std::make_shared<PaxSparseFilter>(relation, allow_fallback_to_pg);
@@ -123,10 +122,11 @@ void PaxFilter::SetColumnProjection(const std::vector<int> &cols, int natts) {
123122
}
124123

125124
void PaxFilter::InitRowFilter(Relation relation, PlanState *ps,
126-
const std::vector<bool> &projection) {
125+
const std::vector<bool> &projection, ScanKey key,
126+
int nkeys) {
127127
Assert(!row_filter_);
128128
row_filter_ = std::make_shared<PaxRowFilter>();
129-
if (!row_filter_->Initialize(relation, ps, projection)) {
129+
if (!row_filter_->Initialize(relation, ps, projection, key, nkeys)) {
130130
row_filter_ = nullptr;
131131
}
132132
}

contrib/pax_storage/src/cpp/storage/filter/pax_filter.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ class PaxFilter final {
7373

7474
// The row filter
7575
void InitRowFilter(Relation relation, PlanState *ps,
76-
const std::vector<bool> &projection);
76+
const std::vector<bool> &projection, ScanKey key,
77+
int nkeys);
7778
std::shared_ptr<PaxRowFilter> GetRowFilter();
7879

7980
void LogStatistics() const;

contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.cc

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
*/
2727

2828
#include "storage/filter/pax_row_filter.h"
29+
2930
#include "comm/cbdb_wrappers.h"
3031

3132
namespace paxc {
@@ -45,19 +46,54 @@ static inline void FindAttrsInQual(Node *qual, bool *proj, int ncol,
4546
}
4647

4748
static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps,
48-
pax::ExecutionFilterContext *ctx) {
49+
pax::ExecutionFilterContext *ctx,
50+
ScanKey key, int nkeys) {
4951
List *qual = ps->plan->qual;
5052
List **qual_list;
5153
ListCell *lc;
5254
bool *proj;
5355
int *qual_atts;
5456
int natts = RelationGetNumberOfAttributes(rel);
5557

56-
if (!qual || !IsA(qual, List)) return false;
58+
int ret = false;
59+
60+
if (key && nkeys > 0) {
61+
if (nodeTag(ps) != T_SeqScanState) {
62+
elog(ERROR, "runtime filter only support seqscan state, but got %d",
63+
nodeTag(ps));
64+
}
65+
66+
for (int i = 0; i < nkeys; i++) {
67+
if (key[i].sk_flags & SK_BLOOM_FILTER) {
68+
ctx->runtime_bloom_keys.emplace_back(key[i]);
69+
ret = true;
70+
}
71+
}
72+
73+
// register bloom filters
74+
for (int i = 0; i < (int)ctx->runtime_bloom_keys.size(); ++i) {
75+
pax::ExecutionFilterContext::FilterNode node;
76+
node.kind = pax::ExecutionFilterContext::FilterKind::kBloom;
77+
node.index = i;
78+
ctx->filter_nodes.emplace_back(node);
79+
}
80+
81+
if (ps->instrument) {
82+
ps->instrument->prf_work = true;
83+
}
84+
ctx->ps = ps;
85+
86+
// set filter_in_seqscan to false, so that the filter will not be executed
87+
// in SeqNext(), but will be executed in pax_row_filter
88+
auto seqscan = (SeqScanState *)ps;
89+
seqscan->filter_in_seqscan = false;
90+
}
91+
92+
if (!qual || !IsA(qual, List)) return ret;
5793

5894
if (list_length(qual) == 1 && IsA(linitial(qual), BoolExpr)) {
5995
auto boolexpr = (BoolExpr *)linitial(qual);
60-
if (boolexpr->boolop != AND_EXPR) return false;
96+
if (boolexpr->boolop != AND_EXPR) return ret;
6197
qual = boolexpr->args;
6298
}
6399
Assert(IsA(qual, List));
@@ -98,6 +134,11 @@ static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps,
98134
if (!qual_list[i]) continue;
99135
ctx->estates[k] = ExecInitQual(qual_list[i], ps);
100136
ctx->attnos[k] = i;
137+
// register expr filter node (by index k)
138+
pax::ExecutionFilterContext::FilterNode node;
139+
node.kind = pax::ExecutionFilterContext::FilterKind::kExpr;
140+
node.index = k;
141+
ctx->filter_nodes.emplace_back(node);
101142
list_free(qual_list[i]);
102143
k++;
103144
}
@@ -108,7 +149,11 @@ static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps,
108149
list_free(qual_list[0]);
109150
}
110151

111-
Assert(ctx->size > 0 || ctx->estate_final);
152+
Assert(ctx->size > 0 || ctx->estate_final ||
153+
ctx->runtime_bloom_keys.size() > 0);
154+
155+
// remove qual from plan state, so that the qual will not be executed in
156+
// executor, but will be executed in pax_row_filter
112157
ps->qual = nullptr;
113158

114159
pfree(proj);
@@ -117,20 +162,19 @@ static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps,
117162
return true;
118163
}
119164

120-
} // namespace paxc
121-
165+
} // namespace paxc
122166

123167
namespace pax {
124168

125169
PaxRowFilter::PaxRowFilter() {}
126170

127-
bool PaxRowFilter::Initialize(Relation rel, PlanState *ps, const std::vector<bool> &projection) {
171+
bool PaxRowFilter::Initialize(Relation rel, PlanState *ps,
172+
const std::vector<bool> &projection, ScanKey key,
173+
int nkeys) {
128174
bool ok = false;
129-
175+
130176
CBDB_WRAP_START;
131-
{
132-
ok = paxc::BuildExecutionFilterForColumns(rel, ps, &efctx_);
133-
}
177+
{ ok = paxc::BuildExecutionFilterForColumns(rel, ps, &efctx_, key, nkeys); }
134178
CBDB_WRAP_END;
135179

136180
if (ok) {
@@ -140,7 +184,8 @@ bool PaxRowFilter::Initialize(Relation rel, PlanState *ps, const std::vector<boo
140184
return ok;
141185
}
142186

143-
void PaxRowFilter::FillRemainingColumns(Relation rel, const std::vector<bool> &projection) {
187+
void PaxRowFilter::FillRemainingColumns(Relation rel,
188+
const std::vector<bool> &projection) {
144189
int natts = RelationGetNumberOfAttributes(rel);
145190
auto proj_len = projection.size();
146191
std::vector<bool> atts(natts);
@@ -162,5 +207,4 @@ void PaxRowFilter::FillRemainingColumns(Relation rel, const std::vector<bool> &p
162207
}
163208
}
164209

165-
166210
} // namespace pax

contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.h

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,38 @@ struct ExecutionFilterContext {
4242
ExprState *estate_final = nullptr;
4343
ExprState **estates;
4444
AttrNumber *attnos;
45+
PlanState *ps;
4546
int size = 0;
4647
inline bool HasExecutionFilter() const { return size > 0 || estate_final; }
48+
49+
// runtime bloom filters pushed down via SeqScanState->filters
50+
// (SK_BLOOM_FILTER)
51+
std::vector<ScanKeyData> runtime_bloom_keys;
52+
53+
// unified filter nodes (expr + bloom) for execution ordering
54+
enum class FilterKind { kExpr, kBloom };
55+
struct FilterNode {
56+
FilterKind kind;
57+
int index; // index in estates (for kExpr) or in runtime_bloom_keys (for
58+
// kBloom)
59+
uint64 tested = 0; // number of rows tested during sampling
60+
uint64 passed = 0; // number of rows passed during sampling
61+
double score = 1.0; // pass rate used for ordering (lower is better)
62+
};
63+
std::vector<FilterNode> filter_nodes;
64+
65+
// sampling control to determine filter order
66+
bool sampling = true;
67+
uint64 sample_target = 65536; // number of rows for sampling phase
68+
uint64 sample_rows = 0; // rows seen in sampling
4769
};
4870

4971
class PaxRowFilter final {
50-
public:
72+
public:
5173
PaxRowFilter();
5274

5375
bool Initialize(Relation rel, PlanState *ps,
54-
const std::vector<bool> &projection);
76+
const std::vector<bool> &projection, ScanKey key, int nkeys);
5577

5678
inline const ExecutionFilterContext *GetExecutionFilterContext() const {
5779
return &efctx_;
@@ -60,17 +82,16 @@ class PaxRowFilter final {
6082
inline const std::vector<AttrNumber> &GetRemainingColumns() const {
6183
return remaining_attnos_;
6284
}
63-
64-
private:
85+
86+
private:
6587
void FillRemainingColumns(Relation rel, const std::vector<bool> &projection);
6688

67-
private:
89+
private:
6890
ExecutionFilterContext efctx_;
6991
// all selected columns - single row filting columns
7092
// before running final cross columns expression filtering, the remaining
7193
// columns should be filled.
7294
std::vector<AttrNumber> remaining_attnos_;
7395
};
7496

75-
76-
};
97+
}; // namespace pax

contrib/pax_storage/src/cpp/storage/micro_partition_row_filter_reader.cc

Lines changed: 99 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,14 @@
2727

2828
#include "storage/micro_partition_row_filter_reader.h"
2929

30+
#include <algorithm>
31+
3032
#include "comm/guc.h"
3133
#include "comm/log.h"
3234
#include "comm/pax_memory.h"
3335
#include "storage/filter/pax_filter.h"
34-
#include "storage/filter/pax_sparse_filter.h"
3536
#include "storage/filter/pax_row_filter.h"
37+
#include "storage/filter/pax_sparse_filter.h"
3638
#include "storage/pax_defined.h"
3739
#include "storage/pax_itemptr.h"
3840

@@ -73,6 +75,95 @@ MicroPartitionRowFilterReader::GetNextGroup(TupleDesc desc) {
7375
return group_;
7476
}
7577

78+
void MicroPartitionRowFilterReader::LoadExprFilterColumns(
79+
MicroPartitionReader::Group *group, TupleDesc desc,
80+
const ExecutionFilterContext *ctx, size_t row_index, TupleTableSlot *slot) {
81+
for (int i = 0; i < ctx->size; i++) {
82+
auto attno = ctx->attnos[i];
83+
Assert(attno > 0);
84+
std::tie(slot->tts_values[attno - 1], slot->tts_isnull[attno - 1]) =
85+
group->GetColumnValue(desc, attno - 1, row_index);
86+
}
87+
}
88+
89+
bool MicroPartitionRowFilterReader::EvalBloomNode(
90+
const ExecutionFilterContext *ctx, MicroPartitionReader::Group *group,
91+
TupleDesc desc, size_t row_index, int bloom_index) {
92+
Assert(bloom_index >= 0 &&
93+
(size_t)bloom_index < ctx->runtime_bloom_keys.size());
94+
const auto &skd = ctx->runtime_bloom_keys[bloom_index];
95+
const ScanKey sk = const_cast<ScanKeyData *>(&skd);
96+
bool isnull = false;
97+
Datum val;
98+
std::tie(val, isnull) =
99+
group->GetColumnValue(desc, sk->sk_attno - 1, row_index);
100+
if (isnull) return true;
101+
bloom_filter *bf = (bloom_filter *)DatumGetPointer(sk->sk_argument);
102+
return !bloom_lacks_element(bf, (unsigned char *)&val, sizeof(Datum));
103+
}
104+
105+
bool MicroPartitionRowFilterReader::EvalExprNode(
106+
const ExecutionFilterContext *ctx, TupleTableSlot *slot, int expr_index) {
107+
return TestRowScanInternal(slot, ctx->estates[expr_index],
108+
ctx->attnos[expr_index]);
109+
}
110+
111+
bool MicroPartitionRowFilterReader::EvalFilterNode(
112+
ExecutionFilterContext *ctx, MicroPartitionReader::Group *group,
113+
TupleDesc desc, size_t row_index, TupleTableSlot *slot,
114+
ExecutionFilterContext::FilterNode &node, bool update_stats) {
115+
bool pass = true;
116+
if (node.kind == ExecutionFilterContext::FilterKind::kBloom) {
117+
pass = EvalBloomNode(ctx, group, desc, row_index, node.index);
118+
if (ctx->ps->instrument && !pass) ctx->ps->instrument->nfilteredPRF += 1;
119+
} else {
120+
pass = EvalExprNode(ctx, slot, node.index);
121+
}
122+
if (update_stats) {
123+
node.tested++;
124+
node.passed += pass ? 1 : 0;
125+
}
126+
return pass;
127+
}
128+
129+
bool MicroPartitionRowFilterReader::ApplyFiltersWithSampling(
130+
ExecutionFilterContext *ctx, MicroPartitionReader::Group *group,
131+
TupleDesc desc, size_t row_index, TupleTableSlot *slot) {
132+
if (!ctx->sampling) {
133+
for (auto &node : ctx->filter_nodes) {
134+
if (!EvalFilterNode(ctx, group, desc, row_index, slot, node, false)) {
135+
return false;
136+
}
137+
}
138+
return true;
139+
}
140+
141+
bool all_pass = true;
142+
for (auto &node : ctx->filter_nodes) {
143+
if (!EvalFilterNode(ctx, group, desc, row_index, slot, node, true)) {
144+
all_pass = false;
145+
break;
146+
}
147+
}
148+
ctx->sample_rows++;
149+
if (!all_pass) return false;
150+
151+
if (ctx->sample_rows >= ctx->sample_target) {
152+
for (auto &node : ctx->filter_nodes) {
153+
node.score =
154+
(node.tested == 0) ? 1.0 : (double)node.passed / (double)node.tested;
155+
}
156+
std::stable_sort(ctx->filter_nodes.begin(), ctx->filter_nodes.end(),
157+
[](const auto &a, const auto &b) {
158+
// Lower pass rate first (better selectivity)
159+
if (a.score != b.score) return a.score < b.score;
160+
return (int)a.kind < (int)b.kind;
161+
});
162+
ctx->sampling = false;
163+
}
164+
return true;
165+
}
166+
76167
bool MicroPartitionRowFilterReader::ReadTuple(TupleTableSlot *slot) {
77168
auto g = group_;
78169
Assert(filter_->GetRowFilter());
@@ -108,16 +199,14 @@ bool MicroPartitionRowFilterReader::ReadTuple(TupleTableSlot *slot) {
108199
}
109200
}
110201

111-
for (int i = 0; i < ctx->size; i++) {
112-
auto attno = ctx->attnos[i];
113-
Assert(attno > 0);
114-
std::tie(slot->tts_values[attno - 1], slot->tts_isnull[attno - 1]) =
115-
g->GetColumnValue(desc, attno - 1, current_group_row_index_);
116-
if (!TestRowScanInternal(slot, ctx->estates[i], attno)) {
117-
current_group_row_index_++;
118-
goto retry_next;
119-
}
202+
LoadExprFilterColumns(g.get(), desc, ctx, current_group_row_index_, slot);
203+
if (!ApplyFiltersWithSampling(const_cast<ExecutionFilterContext *>(ctx),
204+
g.get(), desc, current_group_row_index_,
205+
slot)) {
206+
current_group_row_index_++;
207+
goto retry_next;
120208
}
209+
121210
for (auto attno : remaining_columns) {
122211
std::tie(slot->tts_values[attno - 1], slot->tts_isnull[attno - 1]) =
123212
g->GetColumnValue(desc, attno - 1, current_group_row_index_);

0 commit comments

Comments
 (0)