Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1169,9 +1169,12 @@ object CollapseProject extends Rule[LogicalPlan] with AliasHelper {

def apply(plan: LogicalPlan, alwaysInline: Boolean): LogicalPlan = {
plan.transformUpWithPruning(_.containsPattern(PROJECT), ruleId) {
case p1 @ Project(_, p2: Project)
if canCollapseExpressions(p1.projectList, p2.projectList, alwaysInline) =>
p2.copy(projectList = buildCleanedProjectList(p1.projectList, p2.projectList))
case p1 @ Project(_, p2: Project) =>
mergeProjectExpressions(p1.projectList, p2.projectList, alwaysInline) match {
case (Seq(), merged) => p2.copy(projectList = merged)
case (newUpper, newLower) =>
p1.copy(projectList = newUpper, child = p2.copy(projectList = newLower))
}
case p @ Project(_, agg: Aggregate)
if canCollapseExpressions(p.projectList, agg.aggregateExpressions, alwaysInline) &&
canCollapseAggregate(p, agg) =>
Expand All @@ -1191,6 +1194,71 @@ object CollapseProject extends Rule[LogicalPlan] with AliasHelper {
}
}

private def cheapToInlineProducer(
producer: NamedExpression,
relatedConsumers: Iterable[Expression]) = trimAliases(producer) match {
// These collection creation functions are not cheap as a producer, but we have
// optimizer rules that can optimize them out if they are only consumed by
// ExtractValue (See SimplifyExtractValueOps), so we need to allow to inline them to
// avoid perf regression. As an example:
// Project(s.a, s.b, Project(create_struct(a, b, c) as s, child))
// We should collapse these two projects and eventually get Project(a, b, child)
case e @ (_: CreateNamedStruct | _: UpdateFields | _: CreateMap | _: CreateArray) =>
// We can inline the collection creation producer if at most one of its access
// is non-cheap. Cheap access here means the access can be optimized by
// `SimplifyExtractValueOps` and become a cheap expression. For example,
// `create_struct(a, b, c).a` is a cheap access as it can be optimized to `a`.
// For a query:
// Project(s.a, s, Project(create_struct(a, b, c) as s, child))
// We should collapse these two projects and eventually get
// Project(a, create_struct(a, b, c) as s, child)
var nonCheapAccessSeen = false
def nonCheapAccessVisitor(): Boolean = {
// Returns true for all calls after the first.
try {
nonCheapAccessSeen
} finally {
nonCheapAccessSeen = true
}
}

!relatedConsumers
.exists(findNonCheapAccesses(_, producer.toAttribute, e, nonCheapAccessVisitor))

case other => isCheap(other)
}

private def mergeProjectExpressions(
consumers: Seq[NamedExpression],
producers: Seq[NamedExpression],
alwaysInline: Boolean): (Seq[NamedExpression], Seq[NamedExpression]) = {
lazy val producerAttributes = AttributeSet(producers.collect { case a: Alias => a.toAttribute })

// A map from producer attributes to tuples of:
// - how many times the producer is referenced from consumers and
// - the set of consumers that reference the producer.
lazy val producerReferences = AttributeMap(consumers
.flatMap(e => collectReferences(e).filter(producerAttributes.contains).map(_ -> e))
.groupMap(_._1)(_._2)
.transform((_, v) => (v.size, ExpressionSet(v))))

val (substitute, keep) = producers.partition {
case a: Alias if producerReferences.contains(a.toAttribute) =>
val (count, relatedConsumers) = producerReferences(a.toAttribute)
a.deterministic &&
(alwaysInline || count == 1 || cheapToInlineProducer(a, relatedConsumers))

case _ => true
}

val substituted = buildCleanedProjectList(consumers, substitute)
if (keep.isEmpty) {
(Seq.empty, substituted)
} else {
(substituted, keep ++ AttributeSet(substitute.flatMap(_.references)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do like this new approach that allows partial merging of two Projects. It's a net win and we should merge it first.

But we should also reconsider the cost model for python UDFs. Merging them into a single Project is a huge win, and we should do it even if we need to replicate some expensive expressions. We can discuss it in a followup PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I can continue with a follow-up to merge all possible python UDFs after this PR.

}
}

/**
* Check if we can collapse expressions safely.
*/
Expand All @@ -1206,7 +1274,7 @@ object CollapseProject extends Rule[LogicalPlan] with AliasHelper {
*/
def canCollapseExpressions(
consumers: Seq[Expression],
producerMap: Map[Attribute, Expression],
producerMap: Map[Attribute, NamedExpression],
alwaysInline: Boolean = false): Boolean = {
// We can only collapse expressions if all input expressions meet the following criteria:
// - The input is deterministic.
Expand All @@ -1221,38 +1289,8 @@ object CollapseProject extends Rule[LogicalPlan] with AliasHelper {
val producer = producerMap.getOrElse(reference, reference)
val relatedConsumers = consumers.filter(_.references.contains(reference))

def cheapToInlineProducer: Boolean = trimAliases(producer) match {
// These collection creation functions are not cheap as a producer, but we have
// optimizer rules that can optimize them out if they are only consumed by
// ExtractValue (See SimplifyExtractValueOps), so we need to allow to inline them to
// avoid perf regression. As an example:
// Project(s.a, s.b, Project(create_struct(a, b, c) as s, child))
// We should collapse these two projects and eventually get Project(a, b, child)
case e @ (_: CreateNamedStruct | _: UpdateFields | _: CreateMap | _: CreateArray) =>
// We can inline the collection creation producer if at most one of its access
// is non-cheap. Cheap access here means the access can be optimized by
// `SimplifyExtractValueOps` and become a cheap expression. For example,
// `create_struct(a, b, c).a` is a cheap access as it can be optimized to `a`.
// For a query:
// Project(s.a, s, Project(create_struct(a, b, c) as s, child))
// We should collapse these two projects and eventually get
// Project(a, create_struct(a, b, c) as s, child)
var nonCheapAccessSeen = false
def nonCheapAccessVisitor(): Boolean = {
// Returns true for all calls after the first.
try {
nonCheapAccessSeen
} finally {
nonCheapAccessSeen = true
}
}

!relatedConsumers.exists(findNonCheapAccesses(_, reference, e, nonCheapAccessVisitor))

case other => isCheap(other)
}

producer.deterministic && (count == 1 || alwaysInline || cheapToInlineProducer)
producer.deterministic &&
(count == 1 || alwaysInline || cheapToInlineProducer(producer, relatedConsumers))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,4 +298,20 @@ class CollapseProjectSuite extends PlanTest {
comparePlans(optimized, expected)
}
}

test("SPARK-53399: Merge expressions") {
val query = testRelation
.select($"a" + 1 as "a_plus_1", $"b" + 1 as "b_plus_1")
.select($"a_plus_1" + $"a_plus_1", $"b_plus_1")
.analyze

val optimized = Optimize.execute(query)

val expected = testRelation
.select($"a" + 1 as "a_plus_1", $"b")
.select($"a_plus_1" + $"a_plus_1", $"b" + 1 as "b_plus_1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it again, if b_plus_1 references 1000 columns, then this optimization actually makes the plan worse, as the lower Project becomes super wide.

The benefits of this new optimization:

  • if we happen to put two Python UDF into a single Project, it's a hude win.
  • combining two expressions allows the optimizer to make better decisions.

Given that we will handle Python UDF specially later, let's be more conservative here:

...
val newRefs = AttributeSet(substitute.flatMap(_.references))
if (newRefs.length <= substitute) ... else (consumers, producers)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Why actually does it cause problems if we have 1000 passthrough attributes in the lower project? And does it make sense to introduce a config (with a default like 200) to allow growing the lower project that wide?

  2. I tihnk when we calculate the newRefs we should remove those passthrough attributes from the set that are already in keep. (This logic is missing from the current PR too.)

  3. Also, if we have b_plus_1 that references 1000 columns and b_plus_2 that references only 1 column then we should keep only b_plus_1 in the lower project, but substitute b_plus_2 into the upper one. Actually I think we should probably sort the expressions in substitute by the net number of columns the expression's substitution would modify the lower project (this net number can be in the [-1..inf) range). Then we should retain the expressions in substitute until the running sum of those net values is <=0 and move the remaining expressions back to keep.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With whole stage codegen, a passthrough Project with many columns doesn't matter. But Spark already has many third-party native engines and each operator needs to materialize its output to columnar batches. Then such Project can hurt performance quite a bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ehh, I just hope that those engines don't materialize ouput project by project, otherwise it would be better to collapse everything...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh one more point: what if we already have two python UDFs in the lower project and we move one of them into the upper Project? Without merging the two Project, the benefit of combining two expressions is likely negligible. Moving expressions between two Projects should mainly consider Python UDFs.

Copy link
Contributor Author

@peter-toth peter-toth Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, indeed this is tough.
So shall we just force combining adjacent projects (when everything in them is deterministic) in a group of projects, if there is a parent project in the group with python UDFs?

Let me come up with something...

Copy link
Contributor Author

@peter-toth peter-toth Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened a more conservative PR here: #52238 and let me revisit this merging logic after that.

.analyze

comparePlans(optimized, expected)
}
}