Package org.apache.drill.exec.physical.resultSet.impl
The primary purpose of this loader, and the most complex to understand and maintain, is overflow handling.
Detailed Use Cases
Let's examine it by considering a number of use cases.Row | a | b | c | d | e | f | g | h |
---|---|---|---|---|---|---|---|---|
n-2 | X | X | X | X | X | X | - | - |
n-1 | X | X | X | X | - | - | ||
n | X | ! | O | O | O |
- n-2, n-1, and n are rows. n is the overflow row.
- X indicates a value was written before overflow.
- Blank indicates no value was written in that row.
- ! indicates the value that triggered overflow.
- - indicates a column that did not exist prior to overflow.
- O indicates a value written after overflow.
The scenarios, identified by column names above, are:
- a
- a contains values for all three rows.
- Two values were written in the "main" batch, while a third was written to what becomes the overflow row.
- When overflow occurs, the last write position is at n. It must be moved back to n-1.
- Since data was written to the overflow row, it is copied to the look- ahead batch.
- The last write position in the lookahead batch is 0 (since data was copied into the 0th row.
- When harvesting, no empty-filling is needed. Values in the main batch are zero-filled when the batch is finished, values in the look-ahead batch are back-filled when the first value is written.
- When starting the next batch, the last write position must be set to 0 to reflect the presence of the value for row n.
- b
- b contains values for all three rows. The value for row n triggers
overflow.
- The last write position is at n-1, which is kept for the "main" vector.
- A new overflow vector is created and starts empty, with the last write position at -1.
- Once created, b is immediately written to the overflow vector, advancing the last write position to 0.
- Harvesting, and starting the next for column b works the same as column a.
- c
- Column c has values for all rows.
- The value for row n is written after overflow.
- At overflow, the last write position is at n-1.
- At overflow, a new lookahead vector is created with the last write position at -1.
- The value of c is written to the lookahead vector, advancing the last write position to -1.
- Harvesting, and starting the next for column c works the same as column a.
- d
- Column d writes values to the last two rows before overflow, but not to
the overflow row.
- The last write position for the main batch is at n-1.
- The last write position in the lookahead batch remains at -1.
- Harvesting for column d requires filling an empty value for row n-1.
- When starting the next batch, the last write position must be set to -1, indicating no data yet written.
- f
- Column f has no data in the last position of the main batch, and no data
in the overflow row.
- The last write position is at n-2.
- An empty value must be written into position n-1 during harvest.
- On start of the next batch, the last write position starts at -1.
- g
- Column g is added after overflow, and has a value written to the overflow
row.
- On harvest, column g is simply skipped.
- On start of the next row, the last write position can be left unchanged since no "exchange" was done.
- h
- Column h is added after overflow, but does not have data written to it during the overflow row. Similar to column g, but the last write position starts at -1 for the next batch.
General Rules
The above can be summarized into a smaller set of rules:At the time of overflow on row n:
- Create or clear the lookahead vector.
- Copy (last write position - n + 1) values from row n in the old vector to 0 in the new one. If the copy count is negative, copy nothing. (A negative copy count means that the last write position is behind the current row position. Should not occur after back-filling.)
- Save the last write position from the old vector, clamped at n. (That is, if the last write position is before n, keep it. If at n+1, set it back to n.)
- Set the last write position of the overflow vector to (original last write position - n), clamped at -1. That is, if the original last write position was before n, the new one is -1. If the original last write position is after n, shift it down by n places.
- Swap buffers from the main vectors and the overflow vectors. This sets aside the main values, and allows writing to continue using the overflow buffers.
As the overflow write proceeds:
- For existing columns, write as normal. The last write position moves from -1 to 0.
- Columns not written leave the last write position at -1.
- If a new column appears, set its last write position to -1. If it is then written, proceed as in the first point above.
At harvest time:
- For every writer, save the last write position.
- Swap the overflow and main buffers to put the main batch back into the main vectors.
- Reset the last write position for all writers to the values saved at overflow time above.
- Finish the batch for the main vectors as normal. No special handling needed.
When starting the next batch:
- Swap buffers again, putting the overflow row back into the main vectors. (At this point, the harvested vectors should all have zero buffers.)
- Restore the last write position saved during harvest.
Constraints
A number of constraints are worth pointing out:- Writers are bound to vectors, so we can't easily swap vectors during overflow.
- The project operator to which this operator feeds data also binds to vectors, so the same set of vectors must be presented on every batch.
- The client binds to writers, so we cannot swap writers between main and overflow batches.
- Therefore, the unit of swapping is the buffer that backs the vectors.
- Swapping is not copying; it is only exchanging pointers.
- The only copying in this entire process occurs when moving previously- written values in the overflow row to the new vector at the time of overflow.
Arrays
The above covers the case of scalar, top-level columns. The extension to scalar maps is straightforward: at run time, the members of maps are just simple scalar vectors that reside in a map name space, but the structure of map fields is the same as for top-level fields. (Think of map fields as being "flattened" into the top-level tuple.)Arrays are a different matter: each row can have many values associated with it. Consider an array of scalars. We have:
Row 0 Row 1 Row 2
0 1 2 3 4 5 6 7 8
[ [a b c] [d e f] | [g h i] ]
Here, the letters indicate values. The brackets show the overall vector
(outer brackets) and individual rows (inner brackets). The vertical line
shows where overflow occurred. The same rules as discussed earier still
apply, but we must consider both the row indexes and the array indexes.
- Overflow occurs at the row level. Here row 2 overflowed and must be moved to the look-ahead vector.
- Value movement occurs at the value level. Here, values 6, 7 and 8 must be move to the look-ahead vector.
Row 0 Row 1 Row 0
0 1 2 3 4 5 0 1 2
[ [a b c] [d e f] ] [ [g h i] ]
Further, we must consider lists: a column may consist of a list of
arrays. Or, a column may consist of an array of maps, one of which is
a list of arrays. So, the above reasoning must apply recursively down
the value tree.
As it turns out, there is a simple recursive algorithm, which is a simple extension of the reasoning for the top-level scalar case, that can handle arrays:
- Start with the row index of the overflow row.
- If column c, say, is an array, obtain the index of the first value for the overflow row.
- If c is a list, or a repeated map, then repeat the above, for each member of c (a single column for a list, a set of columns for a map), but replace the row index with the index of the first element.
Resynching Writers after Overflow
When an overflow occurs, our focus is starts with the single top-level row that will not fit into the current batch. We move this row to the look-ahead vectors. Doing so is quite simple when each row is a simple tuple. As described above, the work is quite a bit more complex when the structure is a JSON-like tree flattened into vectors.Consider the writers. Each writer corresponds to a single vector. Writers are grouped into logical tree nodes. Those in the root node write to (single, scalar) columns that are either top-level columns, or nested some level down in single-value (not array) tuples. Another tree level occurs in an array: the elements of the array use a different (faster-changing) index than the top (row-level) writers. Different arrays have different indexes: a row may have, say, four elements in array A, but 20 elements in array B.
Further, arrays can be singular (a repeated int, say) or for an entire tuple (a repeated map.) And, since Drill supports the full JSON model, in the most general case, there is a tree of array indexes that can be nested to an arbitrary level. (A row can have an array of maps which contains a column that is, itself, a list of repeated maps, a field of which is an array of ints.)
Writers handle this index tree via a tree of ColumnWriterIndex
objects, often specialized for various tasks.
Now we can get to the key concept in this section: how we update those indexes after an overflow. The top-level index reverts to zero. (We start writing the 0th row in the new look-ahead batch.) But, nested indexes (those for arrays) will start at some other position depending on the number elements already written in an overflow row. The number of such elements is determined by a top-down traversal of the tree (to determine the start offset of each array for the row.) Resetting the writer indexes is a bottom-up process: based on the number of elements in that array, the writer index is reset to match.
This flow is the opposite of the "normal" case in which a new batch is started top-down, with each index being reset to zero.
The Need for a Uniform Structure
Drill has vastly different implementations and interfaces for:- Result sets (as a
VectorContainer
), - Arrays (as a generated repeated vector),
- Lists (as a
ListVector
), - Repeated lists (as a
vector
, and - Repeated maps (
RepeatedMapVector
.
TupleMode
abstraction. In particular, we use the
single tuple model which works with a single batch. This model provides a
simple, uniform interface to work with columns and tuples (rows, maps),
and a simple way to work with arrays. This interface reduces the above
array algorithm to a simple set of recursive method calls.-
ClassDescriptionBuild the set of writers from a defined schema.Algorithms for building a column given a metadata description of the column and the parent context that will hold the column.Represents the write-time state for a column including the writer and the (optional) backing vector.Primitive (non-map) column state.Columns move through various lifecycle states as identified by this enum.Abstract representation of a container of vectors: a row, a map, a repeated map, a list or a union.Represents the contents of a list vector.Wrapper around the list vector (and its optional contained union).A vector cache implementation which does not actually cache.Do-nothing vector state for a map column which has no actual vector associated with it.Near-do-nothing state for a vector that requires no work to allocate or roll-over, but where we do want to at least track the vector itself.Provides a variety of ways to filter columns: no filtering, filter by (parsed) projection list, or filter by projection list and provided schema.Schema-based projection.Compound filter for combining direct and provided schema projections.Projection filter based on the (parsed) projection list.Implied projection: either project all or project none.Projection filter in which a schema exactly defines the set of allowed columns, and their types.Projection based on a non-strict provided schema which enforces the type of known columns, but has no opinion about additional columns.ProtocolRepresents the internal state of a RepeatedList vector.Repeated list column state.Track the repeated list vector.Vector state for a scalar array (repeated scalar) vector.Implementation of the result set loader.Read-only set of options for the result set loader.Builder for the options for the row set loader.Manages an inventory of value vectors used across row batch readers.Implementation of the row set loader.Base class for a single vector.State for a scalar value vector.Special case for an offset vector.State for a scalar value vector.Represents the loader state for a tuple: a row or a map.TupleState.DictVectorState<T extends ValueVector>Represents a map column (either single or repeated).Represents a tuple defined as a Drill map: single or repeated.State for a map vector.Handles the details of the top-level tuple, the data row itself.Represents the contents of a union vector (or a pseudo-union for lists).Union or list (repeated union) column state.Vector wrapper for a union vector.Handles batch and overflow operation for a (possibly compound) vector.