Class SortMemoryManager

java.lang.Object
org.apache.drill.exec.physical.impl.xsort.SortMemoryManager

public class SortMemoryManager extends Object
Computes the memory needs for input batches, spill batches and merge batches. The key challenges that this code tries to overcome are:
  • Drill is not designed for the small memory allocations, but the planner may provide such allocations because the memory per query is divided among slices (minor fragments) and among buffering operators, leaving very little per operator.
  • Drill does not provide the detailed memory information needed to carefully manage memory in tight constraints.
  • But, Drill has a death penalty for going over the memory limit.
As a result, this class is a bit of a hack: it attempt to consider a number of ill-defined factors in order to divide up memory use in a way that prevents OOM errors.

First, it is necessary to differentiate two concepts:

  • The data size of a batch: the amount of memory needed to hold the data itself. The data size is constant for any given batch.
  • The buffer size of the buffers that hold the data. The buffer size varies wildly depending on how the batch was produced.
The three kinds of buffer layouts seen to date include:
  • One buffer per vector component (data, offsets, null flags, etc.) – create by readers, project and other operators.
  • One buffer for the entire batch, with each vector component using a slice of the overall buffer. – case for batches deserialized from exchanges.
  • One buffer for each top-level vector, with component vectors using slices of the overall vector buffer – the result of reading spilled batches from disk.
In each case, buffer sizes are power-of-two rounded from the data size. But since the data is grouped differently in each case, the resulting buffer sizes vary considerably.

As a result, we can never be sure of the amount of memory needed for a batch. So, we have to estimate based on a number of factors:

  • Uses the RecordBatchSizer to estimate the data size and buffer size of each incoming batch.
  • Estimates the internal fragmentation due to power-of-two rounding.
  • Configured preferences for spill and output batches.
The code handles "normal" and "low" memory conditions.
  • In normal memory, we simply work out the number of preferred-size batches that fit in memory (based on the predicted buffer size.)
  • In low memory, we divide up the available memory to produce the spill and merge batch sizes. The sizes will be less than the configured preference.

The sort has two key configured parameters: the spill file size and the size of the output (downstream) batch. The spill file size is chosen to be large enough to ensure efficient I/O, but not so large as to overwhelm any one spill directory. The output batch size is chosen to be large enough to amortize the per-batch overhead over the maximum number of records, but not so large as to overwhelm downstream operators. Setting these parameters is a judgment call.

Under limited memory, the above sizes may be too big for the space available. For example, the default spill file size is 256 MB. But, if the sort is only given 50 MB, then spill files will be smaller. The default output batch size is 16 MB, but if the sort is given only 20 MB, then the output batch must be smaller. The low memory logic starts with the memory available and works backwards to figure out spill batch size, output batch size and spill file size. The sizes will be smaller than optimal, but as large as will fit in the memory provided.

  • Field Details

    • INTERNAL_FRAGMENTATION_ESTIMATE

      public static final double INTERNAL_FRAGMENTATION_ESTIMATE
      Estimate for typical internal fragmentation in a buffer due to power-of-two rounding on vectors.

      [____|__$__]
      In the above, the brackets represent the whole vector. The first half is always full. The $ represents the end of data. When the first half filled, the second half was allocated. On average, the second half will be half full. This means that, on average, 1/4 of the allocated space is unused (the definition of internal fragmentation.)
      See Also:
    • PAYLOAD_FROM_BUFFER

      public static final double PAYLOAD_FROM_BUFFER
      Given a buffer, this is the assumed amount of space available for data. (Adding more will double the buffer size half the time.)
      See Also:
    • BUFFER_FROM_PAYLOAD

      public static final double BUFFER_FROM_PAYLOAD
      Given a data size, this is the multiplier to create the buffer size estimate. (Note: since we work with aggregate batches, we cannot simply round up to the next power of two: rounding is done on a vector-by-vector basis. Here we need to estimate the aggregate effect of rounding.
      See Also:
    • WORST_CASE_BUFFER_RATIO

      public static final double WORST_CASE_BUFFER_RATIO
      On really bad days, we will add one more byte (or value) to a vector than fits in a power-of-two sized buffer, forcing a doubling. In this case, half the resulting buffer is empty.
      See Also:
    • MIN_ROWS_PER_SORT_BATCH

      public static final int MIN_ROWS_PER_SORT_BATCH
      Desperate attempt to keep spill batches from being too small in low memory.

      The number is also used for logging: the system will log a warning if batches fall below this number which may represent too little memory allocated for the job at hand. (Queries operate on big data: many records. Batches with too few records are a probable performance hit. But, what is too few? It is a judgment call.)

      See Also:
    • LOW_MEMORY_MERGE_BATCH_RATIO

      public static final double LOW_MEMORY_MERGE_BATCH_RATIO
      See Also:
  • Constructor Details

    • SortMemoryManager

      public SortMemoryManager(SortConfig config, long opMemoryLimit)
  • Method Details

    • updateEstimates

      public boolean updateEstimates(int batchDataSize, int batchRowWidth, int batchRowCount)
      Update the data-driven memory use numbers including:
      • The average size of incoming records.
      • The estimated spill and output batch size.
      • The estimated number of average-size records per spill and output batch.
      • The amount of memory set aside to hold the incoming batches before spilling starts.

      Under normal circumstances, the amount of memory available is much larger than the input, spill or merge batch sizes. The primary question is to determine how many input batches we can buffer during the load phase, and how many spill batches we can merge during the merge phase.

      Parameters:
      batchDataSize - the overall size of the current batch received from upstream
      batchRowWidth - the average width in bytes (including overhead) of rows in the current input batch
      batchRowCount - the number of actual (not filtered) records in that upstream batch
      Returns:
      true if the estimates changed, false if the previous estimates remain valid
    • consolidateBatches

      public SortMemoryManager.MergeTask consolidateBatches(long allocMemory, int inMemCount, int spilledRunsCount)
      Choose a consolidation option during the merge phase depending on memory available. Preference is given to moving directly onto merging (with no additional spilling) when possible. But, if memory pressures don't allow this, we must spill batches and/or merge on-disk spilled runs, to reduce the final set of runs to something that can be merged in the available memory.

      Logic is here (returning an enum) rather than in the merge code to allow unit testing without actually needing batches in memory.

      Parameters:
      allocMemory - amount of memory currently allocated (this class knows the total memory available)
      inMemCount - number of incoming batches in memory (the number is important, not the in-memory size; we get the memory size from allocMemory)
      spilledRunsCount - the number of runs sitting on disk to be merged
      Returns:
      whether to SPILL in-memory batches, whether to MERGE on-disk batches to create a new, larger run, or whether to do nothing (NONE) and instead advance to the final merge
    • multiply

      public static int multiply(int byteSize, double multiplier)
    • isSpillNeeded

      public boolean isSpillNeeded(long allocatedBytes, long incomingSize)
    • hasMemoryMergeCapacity

      public boolean hasMemoryMergeCapacity(long allocatedBytes, long neededForInMemorySort)
    • freeMemory

      public long freeMemory(long allocatedBytes)
    • getMergeMemoryLimit

      public long getMergeMemoryLimit()
    • getSpillBatchRowCount

      public int getSpillBatchRowCount()
    • getMergeBatchRowCount

      public int getMergeBatchRowCount()
    • getMemoryLimit

      public long getMemoryLimit()
    • getRowWidth

      public int getRowWidth()
    • getInputBatchSize

      public SortMemoryManager.BatchSizeEstimate getInputBatchSize()
    • getPreferredSpillBatchSize

      public int getPreferredSpillBatchSize()
    • getPreferredMergeBatchSize

      public int getPreferredMergeBatchSize()
    • getSpillBatchSize

      public SortMemoryManager.BatchSizeEstimate getSpillBatchSize()
    • getMergeBatchSize

      public SortMemoryManager.BatchSizeEstimate getMergeBatchSize()
    • getBufferMemoryLimit

      public long getBufferMemoryLimit()
    • mayOverflow

      public boolean mayOverflow()
    • isLowMemory

      public boolean isLowMemory()
    • hasPerformanceWarning

      public boolean hasPerformanceWarning()