Skip to content

Commit

Permalink
Refactor vector qual handling for arrow slots
Browse files Browse the repository at this point in the history
Move the code for vector qual execution to its own module. The vector
qual execution will produce a result in the form of a bitmap filter
for the arrow array. Add functions to the arrow slot to carry the
result bitmap in the arrow tuple table slot. This allows passing the
filter result to nodes above the node that computed the vector qual
result. This is necessary to, e.g., run vectorized aggregation above a
columnar scan.
  • Loading branch information
erimatnor committed Jan 23, 2025
1 parent bbf183c commit b2d6612
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 120 deletions.
3 changes: 2 additions & 1 deletion tsl/src/hypercore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/hypercore_handler.c
${CMAKE_CURRENT_SOURCE_DIR}/hypercore_proxy.c
${CMAKE_CURRENT_SOURCE_DIR}/relstats.c
${CMAKE_CURRENT_SOURCE_DIR}/utils.c)
${CMAKE_CURRENT_SOURCE_DIR}/utils.c
${CMAKE_CURRENT_SOURCE_DIR}/vector_quals.c)
if(PG_VERSION VERSION_GREATER_EQUAL "17.0")
list(APPEND SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/import/analyze.c)
endif()
Expand Down
9 changes: 9 additions & 0 deletions tsl/src/hypercore/arrow_tts.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ tts_arrow_init(TupleTableSlot *slot)
aslot->tuple_index = InvalidTupleIndex;
aslot->total_row_count = 0;
aslot->referenced_attrs = NULL;
aslot->arrow_qual_result = NULL;

/*
* Set up child slots, one for the non-compressed relation and one for the
Expand Down Expand Up @@ -120,6 +121,11 @@ tts_arrow_init(TupleTableSlot *slot)

Assert(TTS_EMPTY(slot));
Assert(TTS_EMPTY(aslot->noncompressed_slot));

/* Memory context reset every new segment. Used to store, e.g., vectorized
* filters */
aslot->per_segment_mcxt =
GenerationContextCreateCompat(slot->tts_mcxt, "Per-segment memory context", 64 * 1024);
}

/*
Expand Down Expand Up @@ -262,6 +268,8 @@ tts_arrow_clear(TupleTableSlot *slot)
/* Clear arrow slot fields */
memset(aslot->valid_attrs, 0, sizeof(bool) * slot->tts_tupleDescriptor->natts);
aslot->arrow_cache_entry = NULL;
aslot->arrow_qual_result = NULL;
MemoryContextReset(aslot->per_segment_mcxt);
}

static inline void
Expand Down Expand Up @@ -333,6 +341,7 @@ tts_arrow_store_tuple(TupleTableSlot *slot, TupleTableSlot *child_slot, uint16 t
aslot->arrow_cache_entry = NULL;
/* Clear valid attributes */
memset(aslot->valid_attrs, 0, sizeof(bool) * slot->tts_tupleDescriptor->natts);
MemoryContextReset(aslot->per_segment_mcxt);
}

/*
Expand Down
38 changes: 36 additions & 2 deletions tsl/src/hypercore/arrow_tts.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ typedef struct ArrowTupleTableSlot
int16 *attrs_offset_map; /* Offset number mappings between the
* non-compressed and compressed
* relation */

/* Per-segment data. The following data is allocated on the per-segment
* memory context which is reset for every new segment stored and
* processed in the slot. */
MemoryContext per_segment_mcxt;

const uint64 *arrow_qual_result; /* Bitmap with result of qual
* filtering over arrow_array. NULL if
* no filtering has been applied. */
} ArrowTupleTableSlot;

extern const TupleTableSlotOps TTSOpsArrowTuple;
Expand Down Expand Up @@ -197,9 +206,9 @@ arrow_slot_get_noncompressed_slot(TupleTableSlot *slot)
}

static inline uint16
arrow_slot_total_row_count(TupleTableSlot *slot)
arrow_slot_total_row_count(const TupleTableSlot *slot)
{
ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot;
const ArrowTupleTableSlot *aslot = (const ArrowTupleTableSlot *) slot;

Assert(TTS_IS_ARROWTUPLE(slot));
Assert(aslot->total_row_count > 0);
Expand Down Expand Up @@ -271,6 +280,23 @@ arrow_slot_is_last(const TupleTableSlot *slot)
return aslot->tuple_index == InvalidTupleIndex || aslot->tuple_index == aslot->total_row_count;
}

static inline void
arrow_slot_set_qual_result(TupleTableSlot *slot, const uint64 *qual_result)
{
ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot;

Assert(TTS_IS_ARROWTUPLE(slot));
aslot->arrow_qual_result = qual_result;
}

static inline const uint64 *
arrow_slot_get_qual_result(const TupleTableSlot *slot)
{
const ArrowTupleTableSlot *aslot = (const ArrowTupleTableSlot *) slot;

return aslot->arrow_qual_result;
}

/*
* Increment or decrement an arrow slot to point to a subsequent row.
*
Expand Down Expand Up @@ -368,6 +394,14 @@ arrow_slot_try_getnext(TupleTableSlot *slot, ScanDirection direction)
return false;
}

static inline MemoryContext
arrow_slot_per_segment_memory_context(const TupleTableSlot *slot)
{
const ArrowTupleTableSlot *aslot = (const ArrowTupleTableSlot *) slot;
Assert(TTS_IS_ARROWTUPLE(slot));
return aslot->per_segment_mcxt;
}

extern bool is_compressed_col(const TupleDesc tupdesc, AttrNumber attno);
extern const ArrowArray *arrow_slot_get_array(TupleTableSlot *slot, AttrNumber attno);
extern void arrow_slot_set_referenced_attrs(TupleTableSlot *slot, Bitmapset *attrs);
Expand Down
138 changes: 138 additions & 0 deletions tsl/src/hypercore/vector_quals.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <postgres.h>
#include "nodes/decompress_chunk/vector_quals.h"
#include <utils/memutils.h>

#include "arrow_tts.h"
#include "vector_quals.h"

/*
* Support functions to execute vectorized quals over arrow tuple table slots.
*/

/*
* Initialize the vector qual state.
*/
void
vector_qual_state_init(VectorQualState *vqstate, List *quals, TupleTableSlot *slot)
{
MemSet(vqstate, 0, sizeof(VectorQualState));
vqstate->vectorized_quals_constified = quals;
vqstate->per_vector_mcxt = arrow_slot_per_segment_memory_context(slot);
vqstate->get_arrow_array = vector_qual_state_get_arrow_array;
vqstate->num_results = TupIsNull(slot) ? 0 : arrow_slot_total_row_count(slot);
vqstate->slot = slot;
}

/*
* Reset the vector qual state.
*
* The function should be called when all values in the arrow array have been
* processed.
*/
void
vector_qual_state_reset(VectorQualState *vqstate)
{
MemoryContextReset(vqstate->per_vector_mcxt);
vqstate->vector_qual_result = NULL;
vqstate->num_results = arrow_slot_total_row_count(vqstate->slot);
arrow_slot_set_qual_result(vqstate->slot, NULL);
}

/*
* Implementation of VectorQualState->get_arrow_array() for arrow tuple table
* slots.
*
* Given a VectorQualState return the ArrowArray in the contained slot.
*/
const ArrowArray *
vector_qual_state_get_arrow_array(VectorQualState *vqstate, Expr *expr, bool *is_default_value)
{
TupleTableSlot *slot = vqstate->slot;
const Var *var = castNode(Var, expr);
const int attoff = AttrNumberGetAttrOffset(var->varattno);
const ArrowArray *array = arrow_slot_get_array(slot, var->varattno);

if (array == NULL)
{
Form_pg_attribute attr = &slot->tts_tupleDescriptor->attrs[attoff];
/*
* If getting here, this is a non-compressed value or a compressed
* column with a default value. We can treat non-compressed values the
* same as default ones. It is not possible to fall back to the
* non-vectorized quals now, so build a single-value ArrowArray with
* this (default) value, check if it passes the predicate, and apply
* it to the entire batch.
*/
array = make_single_value_arrow(attr->atttypid,
slot->tts_values[attoff],
slot->tts_isnull[attoff]);
*is_default_value = true;
}
else
*is_default_value = false;

return array;
}

/*
* Execute vectorized filter over a vector/array of values.
*
* Returns the number of values filtered until the first valid value.
*/
uint16
ExecVectorQual(VectorQualState *vqstate, ExprContext *econtext)
{
TupleTableSlot *slot = econtext->ecxt_scantuple;
const uint16 rowindex = arrow_slot_row_index(slot);

/* Compute the vector quals over both compressed and non-compressed
* tuples. In case a non-compressed tuple is filtered, return SomeRowsPass
* although only one row will pass. */
if (rowindex <= 1)
{
vector_qual_state_reset(vqstate);
VectorQualSummary vector_qual_summary = vqstate->vectorized_quals_constified != NIL ?
vector_qual_compute(vqstate) :
AllRowsPass;

switch (vector_qual_summary)
{
case NoRowsPass:
return arrow_slot_total_row_count(slot);
case AllRowsPass:
/*
* If all rows pass, no need to test the vector qual for each row. This
* is a common case for time range conditions.
*/
vector_qual_state_reset(vqstate);
return 0;
case SomeRowsPass:
break;
}
}

/* Fast path when all rows have passed (i.e., no rows filtered). No need
* to check qual result and it should be NULL. */
if (vqstate->vector_qual_result == NULL)
return 0;

const uint16 nrows = arrow_slot_total_row_count(slot);
const uint16 off = arrow_slot_arrow_offset(slot);
uint16 nfiltered = 0;

for (uint16 i = off; i < nrows; i++)
{
if (arrow_row_is_valid(vqstate->vector_qual_result, i))
break;
nfiltered++;
}

arrow_slot_set_qual_result(slot, vqstate->vector_qual_result);

return nfiltered;
}
17 changes: 17 additions & 0 deletions tsl/src/hypercore/vector_quals.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#pragma once

#include <postgres.h>
#include <nodes/execnodes.h>

#include "nodes/decompress_chunk/vector_quals.h"

extern void vector_qual_state_init(VectorQualState *vqstate, List *quals, TupleTableSlot *slot);
extern void vector_qual_state_reset(VectorQualState *vqstate);
extern const ArrowArray *vector_qual_state_get_arrow_array(VectorQualState *vqstate, Expr *expr,
bool *is_default_value);
extern uint16 ExecVectorQual(VectorQualState *vqstate, ExprContext *econtext);
Loading

0 comments on commit b2d6612

Please sign in to comment.