-
Notifications
You must be signed in to change notification settings - Fork 28.8k
[SPARK-53399][SQL] Restore CollapseProject to merge unrelated expressions #52149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1169,9 +1169,12 @@ object CollapseProject extends Rule[LogicalPlan] with AliasHelper { | |
|
||
def apply(plan: LogicalPlan, alwaysInline: Boolean): LogicalPlan = { | ||
plan.transformUpWithPruning(_.containsPattern(PROJECT), ruleId) { | ||
case p1 @ Project(_, p2: Project) | ||
if canCollapseExpressions(p1.projectList, p2.projectList, alwaysInline) => | ||
p2.copy(projectList = buildCleanedProjectList(p1.projectList, p2.projectList)) | ||
case p1 @ Project(_, p2: Project) => | ||
mergeProjectExpressions(p1.projectList, p2.projectList, alwaysInline) match { | ||
case (Seq(), merged) => p2.copy(projectList = merged) | ||
case (newUpper, newLower) => | ||
p1.copy(projectList = newUpper, child = p2.copy(projectList = newLower)) | ||
} | ||
case p @ Project(_, agg: Aggregate) | ||
if canCollapseExpressions(p.projectList, agg.aggregateExpressions, alwaysInline) && | ||
canCollapseAggregate(p, agg) => | ||
|
@@ -1191,6 +1194,71 @@ object CollapseProject extends Rule[LogicalPlan] with AliasHelper { | |
} | ||
} | ||
|
||
private def cheapToInlineProducer( | ||
producer: NamedExpression, | ||
relatedConsumers: Iterable[Expression]) = trimAliases(producer) match { | ||
// These collection creation functions are not cheap as a producer, but we have | ||
// optimizer rules that can optimize them out if they are only consumed by | ||
// ExtractValue (See SimplifyExtractValueOps), so we need to allow to inline them to | ||
// avoid perf regression. As an example: | ||
// Project(s.a, s.b, Project(create_struct(a, b, c) as s, child)) | ||
// We should collapse these two projects and eventually get Project(a, b, child) | ||
case e @ (_: CreateNamedStruct | _: UpdateFields | _: CreateMap | _: CreateArray) => | ||
// We can inline the collection creation producer if at most one of its access | ||
// is non-cheap. Cheap access here means the access can be optimized by | ||
// `SimplifyExtractValueOps` and become a cheap expression. For example, | ||
// `create_struct(a, b, c).a` is a cheap access as it can be optimized to `a`. | ||
// For a query: | ||
// Project(s.a, s, Project(create_struct(a, b, c) as s, child)) | ||
// We should collapse these two projects and eventually get | ||
// Project(a, create_struct(a, b, c) as s, child) | ||
var nonCheapAccessSeen = false | ||
def nonCheapAccessVisitor(): Boolean = { | ||
// Returns true for all calls after the first. | ||
try { | ||
nonCheapAccessSeen | ||
} finally { | ||
nonCheapAccessSeen = true | ||
} | ||
} | ||
|
||
!relatedConsumers | ||
.exists(findNonCheapAccesses(_, producer.toAttribute, e, nonCheapAccessVisitor)) | ||
|
||
case other => isCheap(other) | ||
} | ||
|
||
private def mergeProjectExpressions( | ||
consumers: Seq[NamedExpression], | ||
producers: Seq[NamedExpression], | ||
alwaysInline: Boolean): (Seq[NamedExpression], Seq[NamedExpression]) = { | ||
lazy val producerAttributes = AttributeSet(producers.collect { case a: Alias => a.toAttribute }) | ||
|
||
// A map from producer attributes to tuples of: | ||
// - how many times the producer is referenced from consumers and | ||
// - the set of consumers that reference the producer. | ||
lazy val producerReferences = AttributeMap(consumers | ||
.flatMap(e => collectReferences(e).filter(producerAttributes.contains).map(_ -> e)) | ||
.groupMap(_._1)(_._2) | ||
.transform((_, v) => (v.size, ExpressionSet(v)))) | ||
|
||
val (substitute, keep) = producers.partition { | ||
case a: Alias if producerReferences.contains(a.toAttribute) => | ||
val (count, relatedConsumers) = producerReferences(a.toAttribute) | ||
a.deterministic && | ||
(alwaysInline || count == 1 || cheapToInlineProducer(a, relatedConsumers)) | ||
|
||
case _ => true | ||
} | ||
|
||
val substituted = buildCleanedProjectList(consumers, substitute) | ||
if (keep.isEmpty) { | ||
(Seq.empty, substituted) | ||
} else { | ||
(substituted, keep ++ AttributeSet(substitute.flatMap(_.references))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do like this new approach that allows partial merging of two Projects. It's a net win and we should merge it first. But we should also reconsider the cost model for python UDFs. Merging them into a single Project is a huge win, and we should do it even if we need to replicate some expensive expressions. We can discuss it in a followup PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I can continue with a follow-up to merge all possible python UDFs after this PR. |
||
} | ||
} | ||
|
||
/** | ||
* Check if we can collapse expressions safely. | ||
*/ | ||
|
@@ -1206,7 +1274,7 @@ object CollapseProject extends Rule[LogicalPlan] with AliasHelper { | |
*/ | ||
def canCollapseExpressions( | ||
consumers: Seq[Expression], | ||
producerMap: Map[Attribute, Expression], | ||
producerMap: Map[Attribute, NamedExpression], | ||
alwaysInline: Boolean = false): Boolean = { | ||
// We can only collapse expressions if all input expressions meet the following criteria: | ||
// - The input is deterministic. | ||
|
@@ -1221,38 +1289,8 @@ object CollapseProject extends Rule[LogicalPlan] with AliasHelper { | |
val producer = producerMap.getOrElse(reference, reference) | ||
val relatedConsumers = consumers.filter(_.references.contains(reference)) | ||
|
||
def cheapToInlineProducer: Boolean = trimAliases(producer) match { | ||
// These collection creation functions are not cheap as a producer, but we have | ||
// optimizer rules that can optimize them out if they are only consumed by | ||
// ExtractValue (See SimplifyExtractValueOps), so we need to allow to inline them to | ||
// avoid perf regression. As an example: | ||
// Project(s.a, s.b, Project(create_struct(a, b, c) as s, child)) | ||
// We should collapse these two projects and eventually get Project(a, b, child) | ||
case e @ (_: CreateNamedStruct | _: UpdateFields | _: CreateMap | _: CreateArray) => | ||
// We can inline the collection creation producer if at most one of its access | ||
// is non-cheap. Cheap access here means the access can be optimized by | ||
// `SimplifyExtractValueOps` and become a cheap expression. For example, | ||
// `create_struct(a, b, c).a` is a cheap access as it can be optimized to `a`. | ||
// For a query: | ||
// Project(s.a, s, Project(create_struct(a, b, c) as s, child)) | ||
// We should collapse these two projects and eventually get | ||
// Project(a, create_struct(a, b, c) as s, child) | ||
var nonCheapAccessSeen = false | ||
def nonCheapAccessVisitor(): Boolean = { | ||
// Returns true for all calls after the first. | ||
try { | ||
nonCheapAccessSeen | ||
} finally { | ||
nonCheapAccessSeen = true | ||
} | ||
} | ||
|
||
!relatedConsumers.exists(findNonCheapAccesses(_, reference, e, nonCheapAccessVisitor)) | ||
|
||
case other => isCheap(other) | ||
} | ||
|
||
producer.deterministic && (count == 1 || alwaysInline || cheapToInlineProducer) | ||
producer.deterministic && | ||
(count == 1 || alwaysInline || cheapToInlineProducer(producer, relatedConsumers)) | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -298,4 +298,20 @@ class CollapseProjectSuite extends PlanTest { | |
comparePlans(optimized, expected) | ||
} | ||
} | ||
|
||
test("SPARK-53399: Merge expressions") { | ||
val query = testRelation | ||
.select($"a" + 1 as "a_plus_1", $"b" + 1 as "b_plus_1") | ||
.select($"a_plus_1" + $"a_plus_1", $"b_plus_1") | ||
.analyze | ||
|
||
val optimized = Optimize.execute(query) | ||
|
||
val expected = testRelation | ||
.select($"a" + 1 as "a_plus_1", $"b") | ||
.select($"a_plus_1" + $"a_plus_1", $"b" + 1 as "b_plus_1") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thinking about it again, if The benefits of this new optimization:
Given that we will handle Python UDF specially later, let's be more conservative here:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With whole stage codegen, a passthrough There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ehh, I just hope that those engines don't materialize ouput project by project, otherwise it would be better to collapse everything... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh one more point: what if we already have two python UDFs in the lower project and we move one of them into the upper Project? Without merging the two There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, indeed this is tough. Let me come up with something... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I opened a more conservative PR here: #52238 and let me revisit this merging logic after that. |
||
.analyze | ||
|
||
comparePlans(optimized, expected) | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.