Skip to content

Conversation

peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Aug 27, 2025

What changes were proposed in this pull request?

Latest improvements to CollapseProject rule (like #33958) prevented duplicating expressive expressons, which brings considerable performance improvement in many cases.
But there is one particular case when it can introduce significant perfoamance degradation. Consider a query where the adjacent project nodes don't get collapsed because they contain expensive, multiple referenced expressions, but the nodes also contain Python UDF expressions that otherwise wouldn't prevent project node collapsion. E.g.:

Project a + a as a_plus_a, PythonUDF(...) as udf2, udf1
  Project <expensive calculation> as a, PythonUDF(...) as udf1
    ...

In the above example CollapseProject doesn't modify the 2 project nodes, which then causes 2 BatchEvalPython nodes to appear in the plan when ExtractPythonUDFs extracts them:

Project a + a as a_plus_a, udf2, udf1
  BatchEvalPython PythonUDF(...) -> udf2
    Project <expensive calculation> as a, udf1
      BatchEvalPython PythonUDF(...) -> udf1
        ...

The 2 BatchEvalPython nodes can cause significant serialization/deserialization overhead compared to the case when the original project nodes were collapsed and we had only 1 BatchEvalPython node.

The old behaviour can be restored with setting spark.sql.optimizer.avoidCollapseUDFWithExpensiveExpr=true, but it is still not ideal as we lose the performance improvement in other cases.

This PR improves to the CollapseProject rule to not just collapse or don't collapse adjacent nodes, but be able to decide on individual expressions if it makes sense to merge them with expressions from the other node.

After this PR the rule modifies the examle plan and merges the unrelated PythonUDFs into the consumer Project node, while it keeps the related a and a + a as a_plus_a expressions separate:

Project a + a as a_plus_a, PythonUDF(...) as udf2, PythonUDF(...) as udf1
  Project <expensive calculation> as a
    ...

The plan is then transformed to:

Project a + a as a_plus_a, udf2, udf1
  BatchEvalPython PythonUDF(...) -> udf2, PythonUDF(...) -> udf1
    Project <expensive calculation> as a
      ...

and both Python UDFs are calculated in one run.

Why are the changes needed?

To fix performance regression caused by latest changes to CollapseProject.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New and existing UTs.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Aug 27, 2025
@peter-toth peter-toth changed the title [WIP][SPARK-53399][SQL] Improve CollapseProject [SPARK-53399][SQL] Improve CollapseProject Aug 27, 2025
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a regression bug fix, we had better be clear in the PR title instead of Improve CollapseProject. Also, please revise the Affected Versions if you want to backport this to live release branches, @peter-toth .

Screenshot 2025-08-27 at 13 35 51

@peter-toth peter-toth changed the title [SPARK-53399][SQL] Improve CollapseProject [SPARK-53399][SQL] Refine CollapseProject to merge unrelated expressions Aug 28, 2025
@peter-toth peter-toth changed the title [SPARK-53399][SQL] Refine CollapseProject to merge unrelated expressions [SPARK-53399][SQL] Fix CollapseProject to merge unrelated expressions Aug 28, 2025
@peter-toth peter-toth changed the title [SPARK-53399][SQL] Fix CollapseProject to merge unrelated expressions [SPARK-53399][SQL] Restore CollapseProject to merge unrelated expressions Aug 28, 2025
@peter-toth
Copy link
Contributor Author

@cloud-fan, as you worked on CollapseProject, can you please take a look at this PR if you have some time?

@peter-toth
Copy link
Contributor Author

Since this is a regression bug fix, we had better be clear in the PR title instead of Improve CollapseProject. Also, please revise the Affected Versions if you want to backport this to live release branches, @peter-toth .

Thanks @dongjoon-hyun. I renamed the PR and fixed the affected versions of the ticket.

@cloud-fan
Copy link
Contributor

@peter-toth shall we create a new rule for it? I think it's not about collapsing Projects, but merging batch-eval expressions such as Python UDF from adjacent Projects into the top Project.

@dongjoon-hyun
Copy link
Member

+1 for @cloud-fan 's opinion.

@peter-toth
Copy link
Contributor Author

peter-toth commented Aug 29, 2025

Technically, I can create a new rule and limit the scope to batch eval expressions like Python UDFs, but I feel the new rule will share most of its logic with CollapseProject. Like checking if the producer is referenced at most once or if not then determining if it is cheap to inline the producer, as I don't want to move an expression that contains a Python UDF to the top project node in other cases.
I agree that the name CollapseProject doesn't fully describe the functionality after this PR, but do we surely want to have a new rule for a very similar logic?

@cloud-fan
Copy link
Contributor

cloud-fan commented Sep 1, 2025

@peter-toth can you elaborate on the cost model part? In ExtractPythonUDFs, we always deduplicate python UDFs, I think we can blindly merge Python UDFs into the top Project.

@peter-toth
Copy link
Contributor Author

peter-toth commented Sep 1, 2025

For example when we have a non_cheap_operation(PythonUDF()) expression in the producer and the producer is referenced more than once then I don't want to substitute it into the consumers, otherwise we might introduce a new performance regression.

Also, I would not limit the scope to PythonUDFs because in the below case merging the create struct operation from the producer and the extract field from the consumer makes sense, even if the ... expressions make the 2 projects non-collapsable:

Project ..., s.a
  Project ..., create_struct(a, b, c) as s

after improved CollapseProject in this PR and SimplifyExtractValueOps:

Project ..., a
  Project ..., a

And we can come up with more comlicated cases along the logic of cheapToInlineProducer()

IMO it always makes sense to merge expressions to open the way for other rules to optimize them further, and leave only those expressions in the lower project node that otherwise would be extracted if we had a plan level common subexpression elimination rule.

@peter-toth
Copy link
Contributor Author

@cloud-fan , @dongjoon-hyun , do you think we can proceed with this PR based on the above?

producers: Seq[NamedExpression],
alwaysInline: Boolean): (Seq[NamedExpression], Seq[NamedExpression]) = {
lazy val producerAttributes = AttributeSet(producers.collect { case a: Alias => a.toAttribute })
lazy val producerReferences = AttributeMap(consumers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment to explain what the key-value is? the name producerReferences doesn't explain it.

if (keep.isEmpty) {
(Seq.empty, substituted)
} else {
(substituted, keep ++ AttributeSet(substitute.flatMap(_.references)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do like this new approach that allows partial merging of two Projects. It's a net win and we should merge it first.

But we should also reconsider the cost model for python UDFs. Merging them into a single Project is a huge win, and we should do it even if we need to replicate some expensive expressions. We can discuss it in a followup PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants