Class ExternalSortBatch
java.lang.Object
org.apache.drill.exec.record.AbstractRecordBatch<ExternalSort>
org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch
- All Implemented Interfaces:
AutoCloseable
,Iterable<VectorWrapper<?>>
,CloseableRecordBatch
,RecordBatch
,VectorAccessible
External sort batch: a sort batch which can spill to disk in
order to operate within a defined memory footprint.
Basic Operation
The operator has three key phases:
- The load phase in which batches are read from upstream.
- The merge phase in which spilled batches are combined to reduce the number of files below the configured limit. (Best practice is to configure the system to avoid this phase.)
- The delivery phase in which batches are combined to produce the final output.
- The incoming (upstream) operator provides a series of batches.
- This operator sorts each batch, and accumulates them in an in-memory buffer.
- If the in-memory buffer becomes too large, this operator selects a subset of the buffered batches to spill.
- Each spill set is merged to create a new, sorted collection of batches, and each is spilled to disk.
- To allow the use of multiple disk storage, each spill group is written round-robin to a set of spill directories.
Data is spilled to disk as a "run". A run consists of one or more (typically many) batches, each of which is itself a sorted run of records.
During the sort/merge phase:
- When the input operator is complete, this operator merges the accumulated batches (which may be all in memory or partially on disk), and returns them to the output (downstream) operator in chunks of no more than 64K records.
- The final merge must combine a collection of in-memory and spilled batches. Several limits apply to the maximum "width" of this merge. For example, each open spill run consumes a file handle, and we may wish to limit the number of file handles. Further, memory must hold one batch from each run, so we may need to reduce the number of runs so that the remaining runs can fit into memory. A consolidation phase combines in-memory and spilled batches prior to the final merge to control final merge width.
- A special case occurs if no batches were spilled. In this case, the input batches are sorted in memory without merging.
Many complex details are involved in doing the above; the details are explained in the methods of this class.
Configuration Options
- drill.exec.sort.external.spill.fs
- The file system (file://, hdfs://, etc.) of the spill directory.
- drill.exec.sort.external.spill.directories
- The comma delimited list of directories, on the above file system, to which to spill files in round-robin fashion. The query will fail if any one of the directories becomes full.
- drill.exec.sort.external.spill.file_size
- Target size for first-generation spill files Set this to large enough to get nice long writes, but not so large that spill directories are overwhelmed.
- drill.exec.sort.external.mem_limit
- Maximum memory to use for the in-memory buffer. (Primarily for testing.)
- drill.exec.sort.external.batch_limit
- Maximum number of batches to hold in memory. (Primarily for testing.)
- drill.exec.sort.external.spill.max_count
- Maximum number of batches to add to "first generation" files. Defaults to 0 (no limit). (Primarily for testing.)
- drill.exec.sort.external.spill.min_count
- Minimum number of batches to add to "first generation" files. Defaults to 0 (no limit). (Primarily for testing.)
- drill.exec.sort.external.merge_limit
- Sets the maximum number of runs to be merged in a single pass (limits the number of open files.)
The memory limit observed by this operator is the lesser of:
- The maximum allocation allowed the allocator assigned to this batch as set by the Foreman, or
- The maximum limit configured in the mem_limit parameter above. (Primarily for testing.
Output
It is helpful to note that the sort operator will produce one of two kinds of output batches.- A large output with sv4 if data is sorted in memory. The sv4 addresses the entire in-memory sort set. A selection vector remover will copy results into new batches of a size determined by that operator.
- A series of batches, without a selection vector, if the sort spills to disk. In this case, the downstream operator will still be a selection vector remover, but there is nothing for that operator to remove.
Logging
Logging in this operator serves two purposes:- Normal diagnostic information.
- Capturing the essence of the operator functionality for analysis in unit tests.
-
Nested Class Summary
Nested classes/interfaces inherited from class org.apache.drill.exec.record.AbstractRecordBatch
AbstractRecordBatch.BatchState
Nested classes/interfaces inherited from interface org.apache.drill.exec.record.RecordBatch
RecordBatch.IterOutcome
-
Field Summary
Modifier and TypeFieldDescriptionprotected static final ControlsInjector
static final String
static final String
static final String
static final String
Fields inherited from class org.apache.drill.exec.record.AbstractRecordBatch
batchStatsContext, container, context, oContext, popConfig, state, stats, unionTypeEnabled
Fields inherited from interface org.apache.drill.exec.record.RecordBatch
MAX_BATCH_ROW_COUNT
-
Constructor Summary
ConstructorDescriptionExternalSortBatch
(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) -
Method Summary
Modifier and TypeMethodDescriptionvoid
Called byAbstractRecordBatch
as a fast-path to obtain the first record batch and setup the schema of this batch in order to quickly return the schema to the client.protected void
void
close()
Extreme paranoia to avoid leaving resources unclosed in the case of an error.void
dump()
Perform dump of this batch's state to logs.int
Get the number of records.Returns outputSV4 instead of resultsIterator sv4.Gets a writable version of this batch.Process each request for a batch.static void
releaseBatches
(RecordBatch incoming) static void
retainSv4OnNone
(RecordBatch incoming) Workaround for DRILL-5656.Methods inherited from class org.apache.drill.exec.record.AbstractRecordBatch
cancel, checkContinue, getContainer, getContext, getOutgoingContainer, getPopConfig, getRecordBatchStatsContext, getSchema, getValueAccessorById, getValueVectorId, isRecordBatchStatsLoggingEnabled, iterator, next, next, next, schemaChangeException, schemaChangeException
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface java.lang.Iterable
forEach, spliterator
-
Field Details
-
INTERRUPTION_AFTER_SORT
- See Also:
-
INTERRUPTION_AFTER_SETUP
- See Also:
-
INTERRUPTION_WHILE_SPILLING
- See Also:
-
INTERRUPTION_WHILE_MERGING
- See Also:
-
injector
-
-
Constructor Details
-
ExternalSortBatch
-
-
Method Details
-
getRecordCount
public int getRecordCount()Description copied from interface:VectorAccessible
Get the number of records.- Returns:
- number of records
-
getSelectionVector4
Returns outputSV4 instead of resultsIterator sv4. For resultsIterator which has null SV4 outputSV4 will be empty. But Sort with EMIT outcome will ideally fail in those cases while preparing output container as it's not supported currently, like for spilling scenarios.- Specified by:
getSelectionVector4
in interfaceVectorAccessible
- Overrides:
getSelectionVector4
in classAbstractRecordBatch<ExternalSort>
-
getSelectionVector2
- Specified by:
getSelectionVector2
in interfaceVectorAccessible
- Overrides:
getSelectionVector2
in classAbstractRecordBatch<ExternalSort>
-
buildSchema
public void buildSchema()Called byAbstractRecordBatch
as a fast-path to obtain the first record batch and setup the schema of this batch in order to quickly return the schema to the client. Note that this method fetches the first batch from upstream which will be waiting for us the first time thatinnerNext()
is called.- Overrides:
buildSchema
in classAbstractRecordBatch<ExternalSort>
-
innerNext
Process each request for a batch. The first request retrieves all the incoming batches and sorts them, optionally spilling to disk as needed. Subsequent calls retrieve the sorted results in fixed-size batches.- Specified by:
innerNext
in classAbstractRecordBatch<ExternalSort>
-
getWritableBatch
Description copied from interface:RecordBatch
Gets a writable version of this batch. Takes over ownership of existing buffers.- Specified by:
getWritableBatch
in interfaceRecordBatch
- Overrides:
getWritableBatch
in classAbstractRecordBatch<ExternalSort>
-
cancelIncoming
protected void cancelIncoming()- Specified by:
cancelIncoming
in classAbstractRecordBatch<ExternalSort>
-
close
public void close()Extreme paranoia to avoid leaving resources unclosed in the case of an error. Since generally only the first error is of interest, we track only the first exception, not potential cascading downstream exceptions.Some Drill code ends up calling close() two or more times. The code here protects itself from these undesirable semantics.
- Specified by:
close
in interfaceAutoCloseable
- Overrides:
close
in classAbstractRecordBatch<ExternalSort>
-
retainSv4OnNone
Workaround for DRILL-5656. We wish to release the batches for an in-memory sort once data is delivered. Normally, we can release them just before returning NONE. But, the StreamingAggBatch requires that the batches still be present on NONE. This method "sniffs" the input provided, and if the external sort, sets a mode that retains the batches. Yes, it is a horrible hack. But, necessary until the Drill iterator protocol can be revised.- Parameters:
incoming
- the incoming batch for some operator which may (or may not) be an external sort (or, an external sort wrapped in a batch iterator validator.)
-
releaseBatches
-
dump
public void dump()Description copied from interface:RecordBatch
Perform dump of this batch's state to logs.
-