-
Notifications
You must be signed in to change notification settings - Fork 28.8k
[SPARK-53399][SQL] Restore CollapseProject to merge unrelated expressions #52149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[SPARK-53399][SQL] Restore CollapseProject to merge unrelated expressions #52149
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a regression bug fix, we had better be clear in the PR title instead of Improve CollapseProject
. Also, please revise the Affected Versions
if you want to backport this to live release branches, @peter-toth .

@cloud-fan, as you worked on |
Thanks @dongjoon-hyun. I renamed the PR and fixed the affected versions of the ticket. |
@peter-toth shall we create a new rule for it? I think it's not about collapsing Projects, but merging batch-eval expressions such as Python UDF from adjacent Projects into the top Project. |
+1 for @cloud-fan 's opinion. |
Technically, I can create a new rule and limit the scope to batch eval expressions like Python UDFs, but I feel the new rule will share most of its logic with |
@peter-toth can you elaborate on the cost model part? In |
For example when we have a Also, I would not limit the scope to
after improved
And we can come up with more comlicated cases along the logic of IMO it always makes sense to merge expressions to open the way for other rules to optimize them further, and leave only those expressions in the lower project node that otherwise would be extracted if we had a plan level common subexpression elimination rule. |
@cloud-fan , @dongjoon-hyun , do you think we can proceed with this PR based on the above? |
producers: Seq[NamedExpression], | ||
alwaysInline: Boolean): (Seq[NamedExpression], Seq[NamedExpression]) = { | ||
lazy val producerAttributes = AttributeSet(producers.collect { case a: Alias => a.toAttribute }) | ||
lazy val producerReferences = AttributeMap(consumers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a comment to explain what the key-value is? the name producerReferences
doesn't explain it.
if (keep.isEmpty) { | ||
(Seq.empty, substituted) | ||
} else { | ||
(substituted, keep ++ AttributeSet(substitute.flatMap(_.references))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do like this new approach that allows partial merging of two Projects. It's a net win and we should merge it first.
But we should also reconsider the cost model for python UDFs. Merging them into a single Project is a huge win, and we should do it even if we need to replicate some expensive expressions. We can discuss it in a followup PR.
What changes were proposed in this pull request?
Latest improvements to
CollapseProject
rule (like #33958) prevented duplicating expressive expressons, which brings considerable performance improvement in many cases.But there is one particular case when it can introduce significant perfoamance degradation. Consider a query where the adjacent project nodes don't get collapsed because they contain expensive, multiple referenced expressions, but the nodes also contain Python UDF expressions that otherwise wouldn't prevent project node collapsion. E.g.:
In the above example
CollapseProject
doesn't modify the 2 project nodes, which then causes 2BatchEvalPython
nodes to appear in the plan whenExtractPythonUDFs
extracts them:The 2
BatchEvalPython
nodes can cause significant serialization/deserialization overhead compared to the case when the original project nodes were collapsed and we had only 1BatchEvalPython
node.The old behaviour can be restored with setting
spark.sql.optimizer.avoidCollapseUDFWithExpensiveExpr=true
, but it is still not ideal as we lose the performance improvement in other cases.This PR improves to the
CollapseProject
rule to not just collapse or don't collapse adjacent nodes, but be able to decide on individual expressions if it makes sense to merge them with expressions from the other node.After this PR the rule modifies the examle plan and merges the unrelated
PythonUDF
s into the consumerProject
node, while it keeps the relateda
anda + a as a_plus_a
expressions separate:The plan is then transformed to:
and both Python UDFs are calculated in one run.
Why are the changes needed?
To fix performance regression caused by latest changes to
CollapseProject
.Does this PR introduce any user-facing change?
No.
How was this patch tested?
New and existing UTs.
Was this patch authored or co-authored using generative AI tooling?
No.