Class SortMemoryManager
- Drill is not designed for the small memory allocations, but the planner may provide such allocations because the memory per query is divided among slices (minor fragments) and among buffering operators, leaving very little per operator.
- Drill does not provide the detailed memory information needed to carefully manage memory in tight constraints.
- But, Drill has a death penalty for going over the memory limit.
First, it is necessary to differentiate two concepts:
- The data size of a batch: the amount of memory needed to hold the data itself. The data size is constant for any given batch.
- The buffer size of the buffers that hold the data. The buffer size varies wildly depending on how the batch was produced.
- One buffer per vector component (data, offsets, null flags, etc.) – create by readers, project and other operators.
- One buffer for the entire batch, with each vector component using a slice of the overall buffer. – case for batches deserialized from exchanges.
- One buffer for each top-level vector, with component vectors using slices of the overall vector buffer – the result of reading spilled batches from disk.
As a result, we can never be sure of the amount of memory needed for a batch. So, we have to estimate based on a number of factors:
- Uses the
RecordBatchSizer
to estimate the data size and buffer size of each incoming batch. - Estimates the internal fragmentation due to power-of-two rounding.
- Configured preferences for spill and output batches.
- In normal memory, we simply work out the number of preferred-size batches that fit in memory (based on the predicted buffer size.)
- In low memory, we divide up the available memory to produce the spill and merge batch sizes. The sizes will be less than the configured preference.
The sort has two key configured parameters: the spill file size and the size of the output (downstream) batch. The spill file size is chosen to be large enough to ensure efficient I/O, but not so large as to overwhelm any one spill directory. The output batch size is chosen to be large enough to amortize the per-batch overhead over the maximum number of records, but not so large as to overwhelm downstream operators. Setting these parameters is a judgment call.
Under limited memory, the above sizes may be too big for the space available. For example, the default spill file size is 256 MB. But, if the sort is only given 50 MB, then spill files will be smaller. The default output batch size is 16 MB, but if the sort is given only 20 MB, then the output batch must be smaller. The low memory logic starts with the memory available and works backwards to figure out spill batch size, output batch size and spill file size. The sizes will be smaller than optimal, but as large as will fit in the memory provided.
-
Nested Class Summary
Modifier and TypeClassDescriptionstatic class
static enum
static class
-
Field Summary
Modifier and TypeFieldDescriptionstatic final double
Given a data size, this is the multiplier to create the buffer size estimate.static final double
Estimate for typical internal fragmentation in a buffer due to power-of-two rounding on vectors.static final double
static final int
Desperate attempt to keep spill batches from being too small in low memory.static final double
Given a buffer, this is the assumed amount of space available for data.static final double
On really bad days, we will add one more byte (or value) to a vector than fits in a power-of-two sized buffer, forcing a doubling. -
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionconsolidateBatches
(long allocMemory, int inMemCount, int spilledRunsCount) Choose a consolidation option during the merge phase depending on memory available.long
freeMemory
(long allocatedBytes) long
long
int
long
int
int
int
int
boolean
hasMemoryMergeCapacity
(long allocatedBytes, long neededForInMemorySort) boolean
boolean
boolean
isSpillNeeded
(long allocatedBytes, long incomingSize) boolean
static int
multiply
(int byteSize, double multiplier) boolean
updateEstimates
(int batchDataSize, int batchRowWidth, int batchRowCount) Update the data-driven memory use numbers including: The average size of incoming records. The estimated spill and output batch size. The estimated number of average-size records per spill and output batch. The amount of memory set aside to hold the incoming batches before spilling starts.
-
Field Details
-
INTERNAL_FRAGMENTATION_ESTIMATE
public static final double INTERNAL_FRAGMENTATION_ESTIMATEEstimate for typical internal fragmentation in a buffer due to power-of-two rounding on vectors.[____|__$__]
In the above, the brackets represent the whole vector. The first half is always full. The $ represents the end of data. When the first half filled, the second half was allocated. On average, the second half will be half full. This means that, on average, 1/4 of the allocated space is unused (the definition of internal fragmentation.)- See Also:
-
PAYLOAD_FROM_BUFFER
public static final double PAYLOAD_FROM_BUFFERGiven a buffer, this is the assumed amount of space available for data. (Adding more will double the buffer size half the time.)- See Also:
-
BUFFER_FROM_PAYLOAD
public static final double BUFFER_FROM_PAYLOADGiven a data size, this is the multiplier to create the buffer size estimate. (Note: since we work with aggregate batches, we cannot simply round up to the next power of two: rounding is done on a vector-by-vector basis. Here we need to estimate the aggregate effect of rounding.- See Also:
-
WORST_CASE_BUFFER_RATIO
public static final double WORST_CASE_BUFFER_RATIOOn really bad days, we will add one more byte (or value) to a vector than fits in a power-of-two sized buffer, forcing a doubling. In this case, half the resulting buffer is empty.- See Also:
-
MIN_ROWS_PER_SORT_BATCH
public static final int MIN_ROWS_PER_SORT_BATCHDesperate attempt to keep spill batches from being too small in low memory.The number is also used for logging: the system will log a warning if batches fall below this number which may represent too little memory allocated for the job at hand. (Queries operate on big data: many records. Batches with too few records are a probable performance hit. But, what is too few? It is a judgment call.)
- See Also:
-
LOW_MEMORY_MERGE_BATCH_RATIO
public static final double LOW_MEMORY_MERGE_BATCH_RATIO- See Also:
-
-
Constructor Details
-
SortMemoryManager
-
-
Method Details
-
updateEstimates
public boolean updateEstimates(int batchDataSize, int batchRowWidth, int batchRowCount) Update the data-driven memory use numbers including:- The average size of incoming records.
- The estimated spill and output batch size.
- The estimated number of average-size records per spill and output batch.
- The amount of memory set aside to hold the incoming batches before spilling starts.
Under normal circumstances, the amount of memory available is much larger than the input, spill or merge batch sizes. The primary question is to determine how many input batches we can buffer during the load phase, and how many spill batches we can merge during the merge phase.
- Parameters:
batchDataSize
- the overall size of the current batch received from upstreambatchRowWidth
- the average width in bytes (including overhead) of rows in the current input batchbatchRowCount
- the number of actual (not filtered) records in that upstream batch- Returns:
- true if the estimates changed, false if the previous estimates remain valid
-
consolidateBatches
public SortMemoryManager.MergeTask consolidateBatches(long allocMemory, int inMemCount, int spilledRunsCount) Choose a consolidation option during the merge phase depending on memory available. Preference is given to moving directly onto merging (with no additional spilling) when possible. But, if memory pressures don't allow this, we must spill batches and/or merge on-disk spilled runs, to reduce the final set of runs to something that can be merged in the available memory.Logic is here (returning an enum) rather than in the merge code to allow unit testing without actually needing batches in memory.
- Parameters:
allocMemory
- amount of memory currently allocated (this class knows the total memory available)inMemCount
- number of incoming batches in memory (the number is important, not the in-memory size; we get the memory size from allocMemory)spilledRunsCount
- the number of runs sitting on disk to be merged- Returns:
- whether to SPILL in-memory batches, whether to MERGE on-disk batches to create a new, larger run, or whether to do nothing (NONE) and instead advance to the final merge
-
multiply
public static int multiply(int byteSize, double multiplier) -
isSpillNeeded
public boolean isSpillNeeded(long allocatedBytes, long incomingSize) -
hasMemoryMergeCapacity
public boolean hasMemoryMergeCapacity(long allocatedBytes, long neededForInMemorySort) -
freeMemory
public long freeMemory(long allocatedBytes) -
getMergeMemoryLimit
public long getMergeMemoryLimit() -
getSpillBatchRowCount
public int getSpillBatchRowCount() -
getMergeBatchRowCount
public int getMergeBatchRowCount() -
getMemoryLimit
public long getMemoryLimit() -
getRowWidth
public int getRowWidth() -
getInputBatchSize
-
getPreferredSpillBatchSize
public int getPreferredSpillBatchSize() -
getPreferredMergeBatchSize
public int getPreferredMergeBatchSize() -
getSpillBatchSize
-
getMergeBatchSize
-
getBufferMemoryLimit
public long getBufferMemoryLimit() -
mayOverflow
public boolean mayOverflow() -
isLowMemory
public boolean isLowMemory() -
hasPerformanceWarning
public boolean hasPerformanceWarning()
-