Class ExternalSortBatch

java.lang.Object
org.apache.drill.exec.record.AbstractRecordBatch<ExternalSort>
org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch
All Implemented Interfaces:
AutoCloseable, Iterable<VectorWrapper<?>>, CloseableRecordBatch, RecordBatch, VectorAccessible

public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort>
External sort batch: a sort batch which can spill to disk in order to operate within a defined memory footprint.

Basic Operation

The operator has three key phases:

  • The load phase in which batches are read from upstream.
  • The merge phase in which spilled batches are combined to reduce the number of files below the configured limit. (Best practice is to configure the system to avoid this phase.)
  • The delivery phase in which batches are combined to produce the final output.
During the load phase:

  • The incoming (upstream) operator provides a series of batches.
  • This operator sorts each batch, and accumulates them in an in-memory buffer.
  • If the in-memory buffer becomes too large, this operator selects a subset of the buffered batches to spill.
  • Each spill set is merged to create a new, sorted collection of batches, and each is spilled to disk.
  • To allow the use of multiple disk storage, each spill group is written round-robin to a set of spill directories.

Data is spilled to disk as a "run". A run consists of one or more (typically many) batches, each of which is itself a sorted run of records.

During the sort/merge phase:

  • When the input operator is complete, this operator merges the accumulated batches (which may be all in memory or partially on disk), and returns them to the output (downstream) operator in chunks of no more than 64K records.
  • The final merge must combine a collection of in-memory and spilled batches. Several limits apply to the maximum "width" of this merge. For example, each open spill run consumes a file handle, and we may wish to limit the number of file handles. Further, memory must hold one batch from each run, so we may need to reduce the number of runs so that the remaining runs can fit into memory. A consolidation phase combines in-memory and spilled batches prior to the final merge to control final merge width.
  • A special case occurs if no batches were spilled. In this case, the input batches are sorted in memory without merging.

Many complex details are involved in doing the above; the details are explained in the methods of this class.

Configuration Options

drill.exec.sort.external.spill.fs
The file system (file://, hdfs://, etc.) of the spill directory.
drill.exec.sort.external.spill.directories
The comma delimited list of directories, on the above file system, to which to spill files in round-robin fashion. The query will fail if any one of the directories becomes full.
drill.exec.sort.external.spill.file_size
Target size for first-generation spill files Set this to large enough to get nice long writes, but not so large that spill directories are overwhelmed.
drill.exec.sort.external.mem_limit
Maximum memory to use for the in-memory buffer. (Primarily for testing.)
drill.exec.sort.external.batch_limit
Maximum number of batches to hold in memory. (Primarily for testing.)
drill.exec.sort.external.spill.max_count
Maximum number of batches to add to "first generation" files. Defaults to 0 (no limit). (Primarily for testing.)
drill.exec.sort.external.spill.min_count
Minimum number of batches to add to "first generation" files. Defaults to 0 (no limit). (Primarily for testing.)
drill.exec.sort.external.merge_limit
Sets the maximum number of runs to be merged in a single pass (limits the number of open files.)

The memory limit observed by this operator is the lesser of:

  • The maximum allocation allowed the allocator assigned to this batch as set by the Foreman, or
  • The maximum limit configured in the mem_limit parameter above. (Primarily for testing.

Output

It is helpful to note that the sort operator will produce one of two kinds of output batches.
  • A large output with sv4 if data is sorted in memory. The sv4 addresses the entire in-memory sort set. A selection vector remover will copy results into new batches of a size determined by that operator.
  • A series of batches, without a selection vector, if the sort spills to disk. In this case, the downstream operator will still be a selection vector remover, but there is nothing for that operator to remove.
Note that, even in the in-memory sort case, this operator could do the copying to eliminate the extra selection vector remover. That is left as an exercise for another time.

Logging

Logging in this operator serves two purposes:
    • Normal diagnostic information.
    • Capturing the essence of the operator functionality for analysis in unit tests.
    Test logging is designed to capture key events and timings. Take care when changing or removing log messages as you may need to adjust unit tests accordingly.
    • Field Details

    • Constructor Details

    • Method Details

      • getRecordCount

        public int getRecordCount()
        Description copied from interface: VectorAccessible
        Get the number of records.
        Returns:
        number of records
      • getSelectionVector4

        public SelectionVector4 getSelectionVector4()
        Returns outputSV4 instead of resultsIterator sv4. For resultsIterator which has null SV4 outputSV4 will be empty. But Sort with EMIT outcome will ideally fail in those cases while preparing output container as it's not supported currently, like for spilling scenarios.
        Specified by:
        getSelectionVector4 in interface VectorAccessible
        Overrides:
        getSelectionVector4 in class AbstractRecordBatch<ExternalSort>
      • getSelectionVector2

        public SelectionVector2 getSelectionVector2()
        Specified by:
        getSelectionVector2 in interface VectorAccessible
        Overrides:
        getSelectionVector2 in class AbstractRecordBatch<ExternalSort>
      • buildSchema

        public void buildSchema()
        Called by AbstractRecordBatch as a fast-path to obtain the first record batch and setup the schema of this batch in order to quickly return the schema to the client. Note that this method fetches the first batch from upstream which will be waiting for us the first time that innerNext() is called.
        Overrides:
        buildSchema in class AbstractRecordBatch<ExternalSort>
      • innerNext

        public RecordBatch.IterOutcome innerNext()
        Process each request for a batch. The first request retrieves all the incoming batches and sorts them, optionally spilling to disk as needed. Subsequent calls retrieve the sorted results in fixed-size batches.
        Specified by:
        innerNext in class AbstractRecordBatch<ExternalSort>
      • getWritableBatch

        public WritableBatch getWritableBatch()
        Description copied from interface: RecordBatch
        Gets a writable version of this batch. Takes over ownership of existing buffers.
        Specified by:
        getWritableBatch in interface RecordBatch
        Overrides:
        getWritableBatch in class AbstractRecordBatch<ExternalSort>
      • cancelIncoming

        protected void cancelIncoming()
        Specified by:
        cancelIncoming in class AbstractRecordBatch<ExternalSort>
      • close

        public void close()
        Extreme paranoia to avoid leaving resources unclosed in the case of an error. Since generally only the first error is of interest, we track only the first exception, not potential cascading downstream exceptions.

        Some Drill code ends up calling close() two or more times. The code here protects itself from these undesirable semantics.

        Specified by:
        close in interface AutoCloseable
        Overrides:
        close in class AbstractRecordBatch<ExternalSort>
      • retainSv4OnNone

        public static void retainSv4OnNone(RecordBatch incoming)
        Workaround for DRILL-5656. We wish to release the batches for an in-memory sort once data is delivered. Normally, we can release them just before returning NONE. But, the StreamingAggBatch requires that the batches still be present on NONE. This method "sniffs" the input provided, and if the external sort, sets a mode that retains the batches. Yes, it is a horrible hack. But, necessary until the Drill iterator protocol can be revised.
        Parameters:
        incoming - the incoming batch for some operator which may (or may not) be an external sort (or, an external sort wrapped in a batch iterator validator.)
      • releaseBatches

        public static void releaseBatches(RecordBatch incoming)
      • dump

        public void dump()
        Description copied from interface: RecordBatch
        Perform dump of this batch's state to logs.