Skip to content

Commit a74d50b

Browse files
ForVicForVic
authored andcommitted
[SPARK-53156][CORE] Track Driver Memory Metrics when the Application ends
### What changes were proposed in this pull request? Report a heartbeat on the driver when the application stops. ### Why are the changes needed? When the application proactively terminates due to some memory issues at the driver (SparkOOM, result size too large, etc...), due to metric sampling issues we will often miss this resourcing problem in the memory metrics and in the event log. We will abort the job before we capture accurate metrics for the driver. If we report an additional heartbeat (metric collection at the driver) on application termination than we will be able to better reflect the memory usage in the event log, shs, etc. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested Tested with a custom job that collected a large amount of data to the driver, and otherwise had very low driver memory usage, (low # partitions no other data structures used at driver), without the change we witnessed that the peak memory usage at the driver was low <~100MiB, with this change we witness the higher memory usage reflected. <img width="1723" height="230" alt="image" src="https://github.com/user-attachments/assets/fb442550-a262-453e-b6e2-f47e1e9f11b1" /> ### Was this patch authored or co-authored using generative AI tooling? No Closes #51882 from ForVic/vsunderl/report_driver_heartbeat. Lead-authored-by: ForVic <vsunderland@linkedin.com> Co-authored-by: Victor Sunderland <64456855+ForVic@users.noreply.github.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
1 parent 1485295 commit a74d50b

File tree

2 files changed

+14
-1
lines changed

2 files changed

+14
-1
lines changed

core/src/main/scala/org/apache/spark/Heartbeater.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ private[spark] class Heartbeater(
4848
heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
4949
}
5050

51+
/**
52+
* Reports a heartbeat.
53+
*/
54+
def doReportHeartbeat(): Unit = {
55+
reportHeartbeat()
56+
}
57+
5158
/** Stops the heartbeat thread. */
5259
def stop(): Unit = {
5360
heartbeater.shutdown()

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2934,8 +2934,14 @@ class SparkContext(config: SparkConf) extends Logging {
29342934
_driverLogger.foreach(_.startSync(_hadoopConfiguration))
29352935
}
29362936

2937-
/** Post the application end event */
2937+
/** Post the application end event and report the final heartbeat */
29382938
private def postApplicationEnd(exitCode: Int): Unit = {
2939+
try {
2940+
_heartbeater.doReportHeartbeat()
2941+
} catch {
2942+
case t: Throwable =>
2943+
logInfo("Unable to report driver heartbeat metrics when stopping spark context", t);
2944+
}
29392945
listenerBus.post(SparkListenerApplicationEnd(System.currentTimeMillis, Some(exitCode)))
29402946
}
29412947

0 commit comments

Comments
 (0)