Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ public ExecGroup getExecGroup(int nodeId) {
return execGroup;
}
}
Preconditions.checkState(false, "not found exec group node: %d", nodeId);
return null;
}

public ExecGroupId getExecGroupId(int nodeId) {
ExecGroup group = getExecGroup(nodeId);
if (group != null) {
return group.getGroupId();
}
return new ExecGroupId(-1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public boolean canAcceptFilter(PlanNode node, RuntimeFilterPushDownContext rfPus
}
// colocate runtime filter couldn't apply to other exec groups
if (isBuildFromColocateGroup && joinMode.equals(COLOCATE)) {
int probeExecGroupId = rfPushCtx.getExecGroup(node.getId().asInt()).getGroupId().asInt();
int probeExecGroupId = rfPushCtx.getExecGroupId(node.getId().asInt()).asInt();
if (execGroupId != probeExecGroupId) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public RuntimeFilterDescription getDescription() {
return description;
}

public ExecGroup getExecGroup(int planNodeId) {
return this.execGroups.getExecGroup(planNodeId);
public ExecGroupId getExecGroupId(int planNodeId) {
return this.execGroups.getExecGroupId(planNodeId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ public void partialSupported() throws Exception {
querys.add("select distinct tb.k1,tb.k2,tb.k3,tb.k4 from (select l.k1 k1, l.k2 k2,r.k1 k3,r.k2 k4 " +
"from (select k1, k2 from colocate1 l) l join [bucket] colocate2 r on l.k1 = r.k1 and l.k2 = r.k2) tb " +
"join colocate1 z;");
// Colocate Join
// / \
// Bucket Shuffle Join Scan
// / \
// Scan Exchange
// Scan
querys.add("select * from colocate1 l left join [bucket] colocate2 r on l.k1=r.k1 and l.k2=r.k2 " +
"join [colocate] colocate2 z on l.k1=z.k1 and l.k2=z.k2;");
// CTE as probe runtime filter probe side
querys.add("with a as (select distinct k1, k2 from colocate1) " +
"select distinct l.k1,r.k2 from colocate1 l join [broadcast] a r on l.k1=r.k1 and l.k2=r.k2 " +
Expand Down
Loading