Package org.apache.drill.exec.physical.resultSet.impl


package org.apache.drill.exec.physical.resultSet.impl
Handles the details of the result set loader implementation.

The primary purpose of this loader, and the most complex to understand and maintain, is overflow handling.

Detailed Use Cases

Let's examine it by considering a number of use cases.
Rowabcdefgh
n-2XXXXXX--
n-1XXXX --
n X!O O O
Here:
  • n-2, n-1, and n are rows. n is the overflow row.
  • X indicates a value was written before overflow.
  • Blank indicates no value was written in that row.
  • ! indicates the value that triggered overflow.
  • - indicates a column that did not exist prior to overflow.
  • O indicates a value written after overflow.
Column a is written before overflow occurs, b causes overflow, and all other columns either are not written, or written after overflow.

The scenarios, identified by column names above, are:

a
a contains values for all three rows.
  • Two values were written in the "main" batch, while a third was written to what becomes the overflow row.
  • When overflow occurs, the last write position is at n. It must be moved back to n-1.
  • Since data was written to the overflow row, it is copied to the look- ahead batch.
  • The last write position in the lookahead batch is 0 (since data was copied into the 0th row.
  • When harvesting, no empty-filling is needed. Values in the main batch are zero-filled when the batch is finished, values in the look-ahead batch are back-filled when the first value is written.
  • When starting the next batch, the last write position must be set to 0 to reflect the presence of the value for row n.
b
b contains values for all three rows. The value for row n triggers overflow.
  • The last write position is at n-1, which is kept for the "main" vector.
  • A new overflow vector is created and starts empty, with the last write position at -1.
  • Once created, b is immediately written to the overflow vector, advancing the last write position to 0.
  • Harvesting, and starting the next for column b works the same as column a.
c
Column c has values for all rows.
  • The value for row n is written after overflow.
  • At overflow, the last write position is at n-1.
  • At overflow, a new lookahead vector is created with the last write position at -1.
  • The value of c is written to the lookahead vector, advancing the last write position to -1.
  • Harvesting, and starting the next for column c works the same as column a.
d
Column d writes values to the last two rows before overflow, but not to the overflow row.
  • The last write position for the main batch is at n-1.
  • The last write position in the lookahead batch remains at -1.
  • Harvesting for column d requires filling an empty value for row n-1.
  • When starting the next batch, the last write position must be set to -1, indicating no data yet written.
f
Column f has no data in the last position of the main batch, and no data in the overflow row.
  • The last write position is at n-2.
  • An empty value must be written into position n-1 during harvest.
  • On start of the next batch, the last write position starts at -1.
g
Column g is added after overflow, and has a value written to the overflow row.
  • On harvest, column g is simply skipped.
  • On start of the next row, the last write position can be left unchanged since no "exchange" was done.
h
Column h is added after overflow, but does not have data written to it during the overflow row. Similar to column g, but the last write position starts at -1 for the next batch.

General Rules

The above can be summarized into a smaller set of rules:

At the time of overflow on row n:

  • Create or clear the lookahead vector.
  • Copy (last write position - n + 1) values from row n in the old vector to 0 in the new one. If the copy count is negative, copy nothing. (A negative copy count means that the last write position is behind the current row position. Should not occur after back-filling.)
  • Save the last write position from the old vector, clamped at n. (That is, if the last write position is before n, keep it. If at n+1, set it back to n.)
  • Set the last write position of the overflow vector to (original last write position - n), clamped at -1. That is, if the original last write position was before n, the new one is -1. If the original last write position is after n, shift it down by n places.
  • Swap buffers from the main vectors and the overflow vectors. This sets aside the main values, and allows writing to continue using the overflow buffers.

As the overflow write proceeds:

  • For existing columns, write as normal. The last write position moves from -1 to 0.
  • Columns not written leave the last write position at -1.
  • If a new column appears, set its last write position to -1. If it is then written, proceed as in the first point above.

At harvest time:

  • For every writer, save the last write position.
  • Swap the overflow and main buffers to put the main batch back into the main vectors.
  • Reset the last write position for all writers to the values saved at overflow time above.
  • Finish the batch for the main vectors as normal. No special handling needed.

When starting the next batch:

  • Swap buffers again, putting the overflow row back into the main vectors. (At this point, the harvested vectors should all have zero buffers.)
  • Restore the last write position saved during harvest.

Constraints

A number of constraints are worth pointing out:
  • Writers are bound to vectors, so we can't easily swap vectors during overflow.
  • The project operator to which this operator feeds data also binds to vectors, so the same set of vectors must be presented on every batch.
  • The client binds to writers, so we cannot swap writers between main and overflow batches.
  • Therefore, the unit of swapping is the buffer that backs the vectors.
  • Swapping is not copying; it is only exchanging pointers.
  • The only copying in this entire process occurs when moving previously- written values in the overflow row to the new vector at the time of overflow.

Arrays

The above covers the case of scalar, top-level columns. The extension to scalar maps is straightforward: at run time, the members of maps are just simple scalar vectors that reside in a map name space, but the structure of map fields is the same as for top-level fields. (Think of map fields as being "flattened" into the top-level tuple.)

Arrays are a different matter: each row can have many values associated with it. Consider an array of scalars. We have:


    Row 0   Row 1     Row 2
    0 1 2   3 4 5     6 7 8
 [ [a b c] [d e f] | [g h i] ]
 
Here, the letters indicate values. The brackets show the overall vector (outer brackets) and individual rows (inner brackets). The vertical line shows where overflow occurred. The same rules as discussed earier still apply, but we must consider both the row indexes and the array indexes.
  • Overflow occurs at the row level. Here row 2 overflowed and must be moved to the look-ahead vector.
  • Value movement occurs at the value level. Here, values 6, 7 and 8 must be move to the look-ahead vector.
The result, after overflow, is:

    Row 0   Row 1       Row 0
    0 1 2   3 4 5       0 1 2
 [ [a b c] [d e f] ] [ [g h i] ]
 
Further, we must consider lists: a column may consist of a list of arrays. Or, a column may consist of an array of maps, one of which is a list of arrays. So, the above reasoning must apply recursively down the value tree.

As it turns out, there is a simple recursive algorithm, which is a simple extension of the reasoning for the top-level scalar case, that can handle arrays:

  • Start with the row index of the overflow row.
  • If column c, say, is an array, obtain the index of the first value for the overflow row.
  • If c is a list, or a repeated map, then repeat the above, for each member of c (a single column for a list, a set of columns for a map), but replace the row index with the index of the first element.
The result will be a walk of the value tree in which the overflow index starts as an index relative to the result set (a row index), and is recursively replaced with an array offset for each level of the array.

Resynching Writers after Overflow

When an overflow occurs, our focus is starts with the single top-level row that will not fit into the current batch. We move this row to the look-ahead vectors. Doing so is quite simple when each row is a simple tuple. As described above, the work is quite a bit more complex when the structure is a JSON-like tree flattened into vectors.

Consider the writers. Each writer corresponds to a single vector. Writers are grouped into logical tree nodes. Those in the root node write to (single, scalar) columns that are either top-level columns, or nested some level down in single-value (not array) tuples. Another tree level occurs in an array: the elements of the array use a different (faster-changing) index than the top (row-level) writers. Different arrays have different indexes: a row may have, say, four elements in array A, but 20 elements in array B.

Further, arrays can be singular (a repeated int, say) or for an entire tuple (a repeated map.) And, since Drill supports the full JSON model, in the most general case, there is a tree of array indexes that can be nested to an arbitrary level. (A row can have an array of maps which contains a column that is, itself, a list of repeated maps, a field of which is an array of ints.)

Writers handle this index tree via a tree of ColumnWriterIndex objects, often specialized for various tasks.

Now we can get to the key concept in this section: how we update those indexes after an overflow. The top-level index reverts to zero. (We start writing the 0th row in the new look-ahead batch.) But, nested indexes (those for arrays) will start at some other position depending on the number elements already written in an overflow row. The number of such elements is determined by a top-down traversal of the tree (to determine the start offset of each array for the row.) Resetting the writer indexes is a bottom-up process: based on the number of elements in that array, the writer index is reset to match.

This flow is the opposite of the "normal" case in which a new batch is started top-down, with each index being reset to zero.

The Need for a Uniform Structure

Drill has vastly different implementations and interfaces for:
  • Result sets (as a VectorContainer),
  • Arrays (as a generated repeated vector),
  • Lists (as a ListVector),
  • Repeated lists (as a vector, and
  • Repeated maps (RepeatedMapVector.
If we were to work directly with the above abstractions the code would be vastly complex. Instead, we abstract out the common structure into the TupleMode abstraction. In particular, we use the single tuple model which works with a single batch. This model provides a simple, uniform interface to work with columns and tuples (rows, maps), and a simple way to work with arrays. This interface reduces the above array algorithm to a simple set of recursive method calls.