Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ final class ShuffleExternalSorter extends MemoryConsumer implements ShuffleCheck
private final int numElementsForSpillThreshold;

/**
* Force this sorter to spill when the size in memory is beyond this threshold.
* Force this sorter to spill when the in memory size in bytes is beyond this threshold.
*/
private final long recordsSizeForSpillThreshold;
private final long sizeInBytesForSpillThreshold;

/** The buffer size to use when writing spills using DiskBlockObjectWriter */
private final int fileBufferSizeBytes;
Expand All @@ -117,7 +117,7 @@ final class ShuffleExternalSorter extends MemoryConsumer implements ShuffleCheck
@Nullable private ShuffleInMemorySorter inMemSorter;
@Nullable private MemoryBlock currentPage = null;
private long pageCursor = -1;
private long inMemRecordsSize = 0;
private long totalPageMemoryUsageBytes = 0;

// Checksum calculator for each partition. Empty when shuffle checksum disabled.
private final Checksum[] partitionChecksums;
Expand All @@ -142,7 +142,7 @@ final class ShuffleExternalSorter extends MemoryConsumer implements ShuffleCheck
(int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
this.numElementsForSpillThreshold =
(int) conf.get(package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD());
this.recordsSizeForSpillThreshold =
this.sizeInBytesForSpillThreshold =
(long) conf.get(package$.MODULE$.SHUFFLE_SPILL_MAX_SIZE_FORCE_SPILL_THRESHOLD());
this.writeMetrics = writeMetrics;
this.inMemSorter = new ShuffleInMemorySorter(
Expand Down Expand Up @@ -314,11 +314,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
}

private long getMemoryUsage() {
long totalPageSize = 0;
for (MemoryBlock page : allocatedPages) {
totalPageSize += page.size();
}
return ((inMemSorter == null) ? 0 : inMemSorter.getMemoryUsage()) + totalPageSize;
return ((inMemSorter == null) ? 0 : inMemSorter.getMemoryUsage()) + totalPageMemoryUsageBytes;
}

private void updatePeakMemoryUsed() {
Expand All @@ -342,11 +338,11 @@ private long freeMemory() {
for (MemoryBlock block : allocatedPages) {
memoryFreed += block.size();
freePage(block);
totalPageMemoryUsageBytes -= block.size();
}
allocatedPages.clear();
currentPage = null;
pageCursor = 0;
inMemRecordsSize = 0;
return memoryFreed;
}

Expand Down Expand Up @@ -417,6 +413,7 @@ private void acquireNewPageIfNecessary(int required) {
currentPage = allocatePage(required);
pageCursor = currentPage.getBaseOffset();
allocatedPages.add(currentPage);
totalPageMemoryUsageBytes += currentPage.size();
}
}

Expand All @@ -432,10 +429,17 @@ public void insertRecord(Object recordBase, long recordOffset, int length, int p
MDC.of(LogKeys.NUM_ELEMENTS_SPILL_RECORDS, inMemSorter.numRecords()),
MDC.of(LogKeys.NUM_ELEMENTS_SPILL_THRESHOLD, numElementsForSpillThreshold));
spill();
} else if (inMemRecordsSize >= recordsSizeForSpillThreshold) {
logger.info("Spilling data because size of spilledRecords ({}) crossed the size threshold {}",
MDC.of(LogKeys.SPILL_RECORDS_SIZE, inMemRecordsSize),
MDC.of(LogKeys.SPILL_RECORDS_SIZE_THRESHOLD, recordsSizeForSpillThreshold));
}

// TODO: Ideally we only need to check the spill threshold when new memory needs to be
// allocated (both this sorter and the underlying ShuffleInMemorySorter may allocate
// new memory), but it's simpler to check the total memory usage of these two sorters
// before inserting each record.
final long usedMemory = getMemoryUsage();
if (usedMemory >= sizeInBytesForSpillThreshold) {
logger.info("Spilling data because memory usage ({}) crossed the threshold {}",
MDC.of(LogKeys.SPILL_RECORDS_SIZE, usedMemory),
MDC.of(LogKeys.SPILL_RECORDS_SIZE_THRESHOLD, sizeInBytesForSpillThreshold));
spill();
}

Expand All @@ -453,7 +457,6 @@ public void insertRecord(Object recordBase, long recordOffset, int length, int p
Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
pageCursor += length;
inMemSorter.insertRecord(recordAddress, partitionId);
inMemRecordsSize += required;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
private final int numElementsForSpillThreshold;

/**
* Force this sorter to spill when the size in memory is beyond this threshold.
* Force this sorter to spill when the in memory size in bytes is beyond this threshold.
*/
private final long recordsSizeForSpillThreshold;
private final long sizeInBytesForSpillThreshold;

/**
* Memory pages that hold the records being sorted. The pages in this list are freed when
Expand All @@ -96,7 +96,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {

// These variables are reset after spilling:
@Nullable private volatile UnsafeInMemorySorter inMemSorter;
private long inMemRecordsSize = 0;
private long totalPageMemoryUsageBytes = 0;

private MemoryBlock currentPage = null;
private long pageCursor = -1;
Expand All @@ -115,12 +115,12 @@ public static UnsafeExternalSorter createWithExistingInMemorySorter(
int initialSize,
long pageSizeBytes,
int numElementsForSpillThreshold,
long recordsSizeForSpillThreshold,
long sizeInBytesForSpillThreshold,
UnsafeInMemorySorter inMemorySorter,
long existingMemoryConsumption) throws IOException {
UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager,
serializerManager, taskContext, recordComparatorSupplier, prefixComparator, initialSize,
pageSizeBytes, numElementsForSpillThreshold, recordsSizeForSpillThreshold,
pageSizeBytes, numElementsForSpillThreshold, sizeInBytesForSpillThreshold,
inMemorySorter, false /* ignored */);
sorter.spill(Long.MAX_VALUE, sorter);
taskContext.taskMetrics().incMemoryBytesSpilled(existingMemoryConsumption);
Expand All @@ -140,11 +140,11 @@ public static UnsafeExternalSorter create(
int initialSize,
long pageSizeBytes,
int numElementsForSpillThreshold,
long recordsSizeForSpillThreshold,
long sizeInBytesForSpillThreshold,
boolean canUseRadixSort) {
return new UnsafeExternalSorter(taskMemoryManager, blockManager, serializerManager,
taskContext, recordComparatorSupplier, prefixComparator, initialSize, pageSizeBytes,
numElementsForSpillThreshold, recordsSizeForSpillThreshold, null, canUseRadixSort);
numElementsForSpillThreshold, sizeInBytesForSpillThreshold, null, canUseRadixSort);
}

private UnsafeExternalSorter(
Expand All @@ -157,7 +157,7 @@ private UnsafeExternalSorter(
int initialSize,
long pageSizeBytes,
int numElementsForSpillThreshold,
long recordsSizeForSpillThreshold,
long sizeInBytesForSpillThreshold,
@Nullable UnsafeInMemorySorter existingInMemorySorter,
boolean canUseRadixSort) {
super(taskMemoryManager, pageSizeBytes, taskMemoryManager.getTungstenMemoryMode());
Expand Down Expand Up @@ -187,7 +187,7 @@ private UnsafeExternalSorter(
this.inMemSorter = existingInMemorySorter;
}
this.peakMemoryUsedBytes = getMemoryUsage();
this.recordsSizeForSpillThreshold = recordsSizeForSpillThreshold;
this.sizeInBytesForSpillThreshold = sizeInBytesForSpillThreshold;
this.numElementsForSpillThreshold = numElementsForSpillThreshold;

// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
Expand Down Expand Up @@ -248,7 +248,6 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
// pages will currently be counted as memory spilled even though that space isn't actually
// written to disk. This also counts the space needed to store the sorter's pointer array.
inMemSorter.freeMemory();
inMemRecordsSize = 0;
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
// records. Otherwise, if the task is over allocated memory, then without freeing the memory
// pages, we might not be able to get memory for the pointer array.
Expand All @@ -264,11 +263,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
* array.
*/
private long getMemoryUsage() {
long totalPageSize = 0;
for (MemoryBlock page : allocatedPages) {
totalPageSize += page.size();
}
return ((inMemSorter == null) ? 0 : inMemSorter.getMemoryUsage()) + totalPageSize;
return ((inMemSorter == null) ? 0 : inMemSorter.getMemoryUsage()) + totalPageMemoryUsageBytes;
}

private void updatePeakMemoryUsed() {
Expand Down Expand Up @@ -320,6 +315,7 @@ private long freeMemory() {
for (MemoryBlock block : pagesToFree) {
memoryFreed += block.size();
freePage(block);
totalPageMemoryUsageBytes -= block.size();
}
return memoryFreed;
}
Expand Down Expand Up @@ -378,6 +374,7 @@ public void cleanupResources() {
} finally {
for (MemoryBlock pageToFree : pagesToFree) {
freePage(pageToFree);
totalPageMemoryUsageBytes -= pageToFree.size();
}
if (inMemSorterToFree != null) {
inMemSorterToFree.freeMemory();
Expand Down Expand Up @@ -448,6 +445,7 @@ private void acquireNewPageIfNecessary(int required) {
currentPage = allocatePage(required);
pageCursor = currentPage.getBaseOffset();
allocatedPages.add(currentPage);
totalPageMemoryUsageBytes += currentPage.size();
}
}

Expand Down Expand Up @@ -495,10 +493,17 @@ public void insertRecord(
MDC.of(LogKeys.NUM_ELEMENTS_SPILL_RECORDS, inMemSorter.numRecords()),
MDC.of(LogKeys.NUM_ELEMENTS_SPILL_THRESHOLD, numElementsForSpillThreshold));
spill();
} else if (inMemRecordsSize >= recordsSizeForSpillThreshold) {
logger.info("Spilling data because size of spilledRecords ({}) crossed the size threshold {}",
MDC.of(LogKeys.SPILL_RECORDS_SIZE, inMemRecordsSize),
MDC.of(LogKeys.SPILL_RECORDS_SIZE_THRESHOLD, recordsSizeForSpillThreshold));
}

// TODO: Ideally we only need to check the spill threshold when new memory needs to be
// allocated (both this sorter and the underlying UnsafeInMemorySorter may allocate
// new memory), but it's simpler to check the total memory usage of these two sorters
// before inserting each record.
final long usedMemory = getMemoryUsage();
if (usedMemory >= sizeInBytesForSpillThreshold) {
logger.info("Spilling data because memory usage ({}) crossed the threshold {}",
MDC.of(LogKeys.SPILL_RECORDS_SIZE, usedMemory),
MDC.of(LogKeys.SPILL_RECORDS_SIZE_THRESHOLD, sizeInBytesForSpillThreshold));
spill();
}

Expand All @@ -514,7 +519,6 @@ public void insertRecord(
Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
pageCursor += length;
inMemSorter.insertRecord(recordAddress, prefix, prefixIsNull);
inMemRecordsSize += required;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1599,9 +1599,9 @@ package object config {
.createWithDefault(Integer.MAX_VALUE)

private[spark] val SHUFFLE_SPILL_MAX_SIZE_FORCE_SPILL_THRESHOLD =
ConfigBuilder("spark.shuffle.spill.maxRecordsSizeForSpillThreshold")
ConfigBuilder("spark.shuffle.spill.maxSizeInBytesForSpillThreshold")
.internal()
.doc("The maximum size in memory before forcing the shuffle sorter to spill. " +
.doc("The maximum in memory size in bytes before forcing the shuffle sorter to spill. " +
"By default it is Long.MAX_VALUE, which means we never force the sorter to spill, " +
"until we reach some limitations, like the max page size limitation for the pointer " +
"array in the sorter.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ public UnsafeKVExternalSorter(
SerializerManager serializerManager,
long pageSizeBytes,
int numElementsForSpillThreshold,
long maxRecordsSizeForSpillThreshold) throws IOException {
long sizeInBytesForSpillThreshold) throws IOException {
this(keySchema, valueSchema, blockManager, serializerManager, pageSizeBytes,
numElementsForSpillThreshold, maxRecordsSizeForSpillThreshold, null);
numElementsForSpillThreshold, sizeInBytesForSpillThreshold, null);
}

public UnsafeKVExternalSorter(
Expand All @@ -73,7 +73,7 @@ public UnsafeKVExternalSorter(
SerializerManager serializerManager,
long pageSizeBytes,
int numElementsForSpillThreshold,
long maxRecordsSizeForSpillThreshold,
long sizeInBytesForSpillThreshold,
@Nullable BytesToBytesMap map) throws IOException {
this.keySchema = keySchema;
this.valueSchema = valueSchema;
Expand All @@ -100,7 +100,7 @@ public UnsafeKVExternalSorter(
(int) (long) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
pageSizeBytes,
numElementsForSpillThreshold,
maxRecordsSizeForSpillThreshold,
sizeInBytesForSpillThreshold,
canUseRadixSort);
} else {
// During spilling, the pointer array in `BytesToBytesMap` will not be used, so we can borrow
Expand Down Expand Up @@ -168,7 +168,7 @@ public UnsafeKVExternalSorter(
(int) (long) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
pageSizeBytes,
numElementsForSpillThreshold,
maxRecordsSizeForSpillThreshold,
sizeInBytesForSpillThreshold,
inMemSorter,
map.getTotalMemoryConsumption());

Expand Down
Loading