diff --git a/tsl/src/hypercore/CMakeLists.txt b/tsl/src/hypercore/CMakeLists.txt index 8719d603956..06d0efee6f3 100644 --- a/tsl/src/hypercore/CMakeLists.txt +++ b/tsl/src/hypercore/CMakeLists.txt @@ -7,7 +7,8 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/hypercore_handler.c ${CMAKE_CURRENT_SOURCE_DIR}/hypercore_proxy.c ${CMAKE_CURRENT_SOURCE_DIR}/relstats.c - ${CMAKE_CURRENT_SOURCE_DIR}/utils.c) + ${CMAKE_CURRENT_SOURCE_DIR}/utils.c + ${CMAKE_CURRENT_SOURCE_DIR}/vector_quals.c) if(PG_VERSION VERSION_GREATER_EQUAL "17.0") list(APPEND SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/import/analyze.c) endif() diff --git a/tsl/src/hypercore/arrow_tts.c b/tsl/src/hypercore/arrow_tts.c index b6e91e874e9..c666434cdfc 100644 --- a/tsl/src/hypercore/arrow_tts.c +++ b/tsl/src/hypercore/arrow_tts.c @@ -93,6 +93,7 @@ tts_arrow_init(TupleTableSlot *slot) aslot->tuple_index = InvalidTupleIndex; aslot->total_row_count = 0; aslot->referenced_attrs = NULL; + aslot->arrow_qual_result = NULL; /* * Set up child slots, one for the non-compressed relation and one for the @@ -120,6 +121,11 @@ tts_arrow_init(TupleTableSlot *slot) Assert(TTS_EMPTY(slot)); Assert(TTS_EMPTY(aslot->noncompressed_slot)); + + /* Memory context reset every new segment. Used to store, e.g., vectorized + * filters */ + aslot->per_segment_mcxt = + GenerationContextCreateCompat(slot->tts_mcxt, "Per-segment memory context", 64 * 1024); } /* @@ -262,6 +268,8 @@ tts_arrow_clear(TupleTableSlot *slot) /* Clear arrow slot fields */ memset(aslot->valid_attrs, 0, sizeof(bool) * slot->tts_tupleDescriptor->natts); aslot->arrow_cache_entry = NULL; + aslot->arrow_qual_result = NULL; + MemoryContextReset(aslot->per_segment_mcxt); } static inline void @@ -333,6 +341,7 @@ tts_arrow_store_tuple(TupleTableSlot *slot, TupleTableSlot *child_slot, uint16 t aslot->arrow_cache_entry = NULL; /* Clear valid attributes */ memset(aslot->valid_attrs, 0, sizeof(bool) * slot->tts_tupleDescriptor->natts); + MemoryContextReset(aslot->per_segment_mcxt); } /* diff --git a/tsl/src/hypercore/arrow_tts.h b/tsl/src/hypercore/arrow_tts.h index 48c9f9c8303..2af5306d96f 100644 --- a/tsl/src/hypercore/arrow_tts.h +++ b/tsl/src/hypercore/arrow_tts.h @@ -79,6 +79,15 @@ typedef struct ArrowTupleTableSlot int16 *attrs_offset_map; /* Offset number mappings between the * non-compressed and compressed * relation */ + + /* Per-segment data. The following data is allocated on the per-segment + * memory context which is reset for every new segment stored and + * processed in the slot. */ + MemoryContext per_segment_mcxt; + + const uint64 *arrow_qual_result; /* Bitmap with result of qual + * filtering over arrow_array. NULL if + * no filtering has been applied. */ } ArrowTupleTableSlot; extern const TupleTableSlotOps TTSOpsArrowTuple; @@ -197,9 +206,9 @@ arrow_slot_get_noncompressed_slot(TupleTableSlot *slot) } static inline uint16 -arrow_slot_total_row_count(TupleTableSlot *slot) +arrow_slot_total_row_count(const TupleTableSlot *slot) { - ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot; + const ArrowTupleTableSlot *aslot = (const ArrowTupleTableSlot *) slot; Assert(TTS_IS_ARROWTUPLE(slot)); Assert(aslot->total_row_count > 0); @@ -271,6 +280,23 @@ arrow_slot_is_last(const TupleTableSlot *slot) return aslot->tuple_index == InvalidTupleIndex || aslot->tuple_index == aslot->total_row_count; } +static inline void +arrow_slot_set_qual_result(TupleTableSlot *slot, const uint64 *qual_result) +{ + ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot; + + Assert(TTS_IS_ARROWTUPLE(slot)); + aslot->arrow_qual_result = qual_result; +} + +static inline const uint64 * +arrow_slot_get_qual_result(const TupleTableSlot *slot) +{ + const ArrowTupleTableSlot *aslot = (const ArrowTupleTableSlot *) slot; + + return aslot->arrow_qual_result; +} + /* * Increment or decrement an arrow slot to point to a subsequent row. * @@ -368,6 +394,14 @@ arrow_slot_try_getnext(TupleTableSlot *slot, ScanDirection direction) return false; } +static inline MemoryContext +arrow_slot_per_segment_memory_context(const TupleTableSlot *slot) +{ + const ArrowTupleTableSlot *aslot = (const ArrowTupleTableSlot *) slot; + Assert(TTS_IS_ARROWTUPLE(slot)); + return aslot->per_segment_mcxt; +} + extern bool is_compressed_col(const TupleDesc tupdesc, AttrNumber attno); extern const ArrowArray *arrow_slot_get_array(TupleTableSlot *slot, AttrNumber attno); extern void arrow_slot_set_referenced_attrs(TupleTableSlot *slot, Bitmapset *attrs); diff --git a/tsl/src/hypercore/vector_quals.c b/tsl/src/hypercore/vector_quals.c new file mode 100644 index 00000000000..3635a5247c3 --- /dev/null +++ b/tsl/src/hypercore/vector_quals.c @@ -0,0 +1,138 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#include +#include "nodes/decompress_chunk/vector_quals.h" +#include + +#include "arrow_tts.h" +#include "vector_quals.h" + +/* + * Support functions to execute vectorized quals over arrow tuple table slots. + */ + +/* + * Initialize the vector qual state. + */ +void +vector_qual_state_init(VectorQualState *vqstate, List *quals, TupleTableSlot *slot) +{ + MemSet(vqstate, 0, sizeof(VectorQualState)); + vqstate->vectorized_quals_constified = quals; + vqstate->per_vector_mcxt = arrow_slot_per_segment_memory_context(slot); + vqstate->get_arrow_array = vector_qual_state_get_arrow_array; + vqstate->num_results = TupIsNull(slot) ? 0 : arrow_slot_total_row_count(slot); + vqstate->slot = slot; +} + +/* + * Reset the vector qual state. + * + * The function should be called when all values in the arrow array have been + * processed. + */ +void +vector_qual_state_reset(VectorQualState *vqstate) +{ + MemoryContextReset(vqstate->per_vector_mcxt); + vqstate->vector_qual_result = NULL; + vqstate->num_results = arrow_slot_total_row_count(vqstate->slot); + arrow_slot_set_qual_result(vqstate->slot, NULL); +} + +/* + * Implementation of VectorQualState->get_arrow_array() for arrow tuple table + * slots. + * + * Given a VectorQualState return the ArrowArray in the contained slot. + */ +const ArrowArray * +vector_qual_state_get_arrow_array(VectorQualState *vqstate, Expr *expr, bool *is_default_value) +{ + TupleTableSlot *slot = vqstate->slot; + const Var *var = castNode(Var, expr); + const int attoff = AttrNumberGetAttrOffset(var->varattno); + const ArrowArray *array = arrow_slot_get_array(slot, var->varattno); + + if (array == NULL) + { + Form_pg_attribute attr = &slot->tts_tupleDescriptor->attrs[attoff]; + /* + * If getting here, this is a non-compressed value or a compressed + * column with a default value. We can treat non-compressed values the + * same as default ones. It is not possible to fall back to the + * non-vectorized quals now, so build a single-value ArrowArray with + * this (default) value, check if it passes the predicate, and apply + * it to the entire batch. + */ + array = make_single_value_arrow(attr->atttypid, + slot->tts_values[attoff], + slot->tts_isnull[attoff]); + *is_default_value = true; + } + else + *is_default_value = false; + + return array; +} + +/* + * Execute vectorized filter over a vector/array of values. + * + * Returns the number of values filtered until the first valid value. + */ +uint16 +ExecVectorQual(VectorQualState *vqstate, ExprContext *econtext) +{ + TupleTableSlot *slot = econtext->ecxt_scantuple; + const uint16 rowindex = arrow_slot_row_index(slot); + + /* Compute the vector quals over both compressed and non-compressed + * tuples. In case a non-compressed tuple is filtered, return SomeRowsPass + * although only one row will pass. */ + if (rowindex <= 1) + { + vector_qual_state_reset(vqstate); + VectorQualSummary vector_qual_summary = vqstate->vectorized_quals_constified != NIL ? + vector_qual_compute(vqstate) : + AllRowsPass; + + switch (vector_qual_summary) + { + case NoRowsPass: + return arrow_slot_total_row_count(slot); + case AllRowsPass: + /* + * If all rows pass, no need to test the vector qual for each row. This + * is a common case for time range conditions. + */ + vector_qual_state_reset(vqstate); + return 0; + case SomeRowsPass: + break; + } + } + + /* Fast path when all rows have passed (i.e., no rows filtered). No need + * to check qual result and it should be NULL. */ + if (vqstate->vector_qual_result == NULL) + return 0; + + const uint16 nrows = arrow_slot_total_row_count(slot); + const uint16 off = arrow_slot_arrow_offset(slot); + uint16 nfiltered = 0; + + for (uint16 i = off; i < nrows; i++) + { + if (arrow_row_is_valid(vqstate->vector_qual_result, i)) + break; + nfiltered++; + } + + arrow_slot_set_qual_result(slot, vqstate->vector_qual_result); + + return nfiltered; +} diff --git a/tsl/src/hypercore/vector_quals.h b/tsl/src/hypercore/vector_quals.h new file mode 100644 index 00000000000..5563f0676a1 --- /dev/null +++ b/tsl/src/hypercore/vector_quals.h @@ -0,0 +1,17 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#pragma once + +#include +#include + +#include "nodes/decompress_chunk/vector_quals.h" + +extern void vector_qual_state_init(VectorQualState *vqstate, List *quals, TupleTableSlot *slot); +extern void vector_qual_state_reset(VectorQualState *vqstate); +extern const ArrowArray *vector_qual_state_get_arrow_array(VectorQualState *vqstate, Expr *expr, + bool *is_default_value); +extern uint16 ExecVectorQual(VectorQualState *vqstate, ExprContext *econtext); diff --git a/tsl/src/nodes/columnar_scan/columnar_scan.c b/tsl/src/nodes/columnar_scan/columnar_scan.c index 1231eeb85d3..db48f3ea492 100644 --- a/tsl/src/nodes/columnar_scan/columnar_scan.c +++ b/tsl/src/nodes/columnar_scan/columnar_scan.c @@ -27,12 +27,11 @@ #include #include "columnar_scan.h" -#include "compression/arrow_c_data_interface.h" #include "compression/compression.h" #include "hypercore/arrow_tts.h" #include "hypercore/hypercore_handler.h" +#include "hypercore/vector_quals.h" #include "import/ts_explain.h" -#include "nodes/decompress_chunk/vector_quals.h" typedef struct SimpleProjInfo { @@ -67,60 +66,6 @@ match_relvar(Expr *expr, Index relid) return false; } -/* - * ColumnarScan implementation of VectorQualState->get_arrow_array(). - * - * Given a VectorQualState return the ArrowArray in the contained slot. - */ -static const ArrowArray * -vector_qual_state_get_arrow_array(VectorQualState *vqstate, Expr *expr, bool *is_default_value) -{ - TupleTableSlot *slot = vqstate->slot; - const Var *var = castNode(Var, expr); - const int attoff = AttrNumberGetAttrOffset(var->varattno); - const ArrowArray *array = arrow_slot_get_array(slot, var->varattno); - - if (array == NULL) - { - Form_pg_attribute attr = &slot->tts_tupleDescriptor->attrs[attoff]; - /* - * If getting here, this is a non-compressed value or a compressed - * column with a default value. We can treat non-compressed values the - * same as default ones. It is not possible to fall back to the - * non-vectorized quals now, so build a single-value ArrowArray with - * this (default) value, check if it passes the predicate, and apply - * it to the entire batch. - */ - array = make_single_value_arrow(attr->atttypid, - slot->tts_values[attoff], - slot->tts_isnull[attoff]); - *is_default_value = true; - } - else - *is_default_value = false; - - return array; -} - -static void -vector_qual_state_reset(VectorQualState *vqstate, ExprContext *econtext) -{ - MemoryContextReset(vqstate->per_vector_mcxt); - vqstate->vector_qual_result = NULL; - vqstate->slot = econtext->ecxt_scantuple; - vqstate->num_results = arrow_slot_total_row_count(vqstate->slot); -} - -static void -vector_qual_state_init(VectorQualState *vqstate, ExprContext *econtext) -{ - vqstate->per_vector_mcxt = GenerationContextCreateCompat(econtext->ecxt_per_query_memory, - "Per-vector memory context", - 64 * 1024); - vqstate->get_arrow_array = vector_qual_state_get_arrow_array; - vqstate->slot = econtext->ecxt_scantuple; -} - /* * Utility function to extract quals that can be used as scankeys. The * remaining "normal" quals are optionally collected in the corresponding @@ -284,62 +229,6 @@ create_scankeys_from_quals(const HypercoreInfo *hsinfo, Index relid, const List return scankeys; } -/* - * Execute vectorized filter over a vector/array of values. - * - * Returns the number of values filtered until the first valid value. - */ -static inline uint16 -ExecVectorQual(VectorQualState *vqstate, ExprContext *econtext) -{ - TupleTableSlot *slot = econtext->ecxt_scantuple; - const uint16 rowindex = arrow_slot_row_index(slot); - - /* Compute the vector quals over both compressed and non-compressed - * tuples. In case a non-compressed tuple is filtered, return SomeRowsPass - * although only one row will pass. */ - if (rowindex <= 1) - { - vector_qual_state_reset(vqstate, econtext); - VectorQualSummary vector_qual_summary = vqstate->vectorized_quals_constified != NIL ? - vector_qual_compute(vqstate) : - AllRowsPass; - - switch (vector_qual_summary) - { - case NoRowsPass: - return arrow_slot_total_row_count(slot); - case AllRowsPass: - /* - * If all rows pass, no need to test the vector qual for each row. This - * is a common case for time range conditions. - */ - vector_qual_state_reset(vqstate, econtext); - return 0; - case SomeRowsPass: - break; - } - } - - /* Fast path when all rows have passed (i.e., no rows filtered). No need - * to check qual result and it should be NULL. */ - if (vqstate->vector_qual_result == NULL) - return 0; - - const uint16 nrows = arrow_slot_total_row_count(slot); - const uint16 off = arrow_slot_arrow_offset(slot); - uint16 nfiltered = 0; - - for (uint16 i = off; i < nrows; i++) - { - if (arrow_row_is_valid(vqstate->vector_qual_result, i)) - break; - nfiltered++; - } - - return nfiltered; -} - static pg_attribute_always_inline TupleTableSlot * exec_projection(SimpleProjInfo *spi) { @@ -391,6 +280,17 @@ getnextslot(TableScanDesc scandesc, ScanDirection direction, TupleTableSlot *slo return table_scan_getnextslot(scandesc, direction, slot); } +static bool +should_project(const CustomScanState *state) +{ +#if PG15_GE + const CustomScan *scan = castNode(CustomScan, state->ss.ps.plan); + return scan->flags & CUSTOMPATH_SUPPORT_PROJECTION; +#else + return false; +#endif +} + static TupleTableSlot * columnar_scan_exec(CustomScanState *state) { @@ -399,16 +299,19 @@ columnar_scan_exec(CustomScanState *state) EState *estate; ExprContext *econtext; ExprState *qual; - ProjectionInfo *projinfo; ScanDirection direction; TupleTableSlot *slot; bool has_vecquals = cstate->vqstate.vectorized_quals_constified != NIL; + /* + * The VectorAgg node could have requested no projection by unsetting the + * "projection support flag", so only project if the flag is still set. + */ + ProjectionInfo *projinfo = should_project(state) ? state->ss.ps.ps_ProjInfo : NULL; scandesc = state->ss.ss_currentScanDesc; estate = state->ss.ps.state; econtext = state->ss.ps.ps_ExprContext; qual = state->ss.ps.qual; - projinfo = state->ss.ps.ps_ProjInfo; direction = estate->es_direction; slot = state->ss.ss_ScanTupleSlot; @@ -627,7 +530,7 @@ columnar_scan_begin(CustomScanState *state, EState *estate, int eflags) ExecAssignScanProjectionInfo(&state->ss); state->ss.ps.qual = ExecInitQual(state->ss.ps.plan->qual, (PlanState *) state); #endif - vector_qual_state_init(&cstate->vqstate, state->ss.ps.ps_ExprContext); + List *vectorized_quals_constified = NIL; if (cstate->nscankeys > 0) { @@ -647,10 +550,16 @@ columnar_scan_begin(CustomScanState *state, EState *estate, int eflags) foreach (lc, cstate->vectorized_quals_orig) { Node *constified = estimate_expression_value(&root, (Node *) lfirst(lc)); - cstate->vqstate.vectorized_quals_constified = - lappend(cstate->vqstate.vectorized_quals_constified, constified); + vectorized_quals_constified = lappend(vectorized_quals_constified, constified); } + /* + * Initialize the state to compute vectorized quals. + */ + vector_qual_state_init(&cstate->vqstate, + vectorized_quals_constified, + state->ss.ss_ScanTupleSlot); + /* If the node is supposed to project, then try to make it a simple * projection. If not possible, it will fall back to standard PostgreSQL * projection. */ @@ -811,6 +720,7 @@ columnar_scan_initialize_worker(CustomScanState *node, shm_toc *toc, void *arg) } static CustomExecMethods columnar_scan_state_methods = { + .CustomName = "ColumnarScan", .BeginCustomScan = columnar_scan_begin, .ExecCustomScan = columnar_scan_exec, .EndCustomScan = columnar_scan_end, @@ -845,6 +755,12 @@ static CustomScanMethods columnar_scan_plan_methods = { .CreateCustomScanState = columnar_scan_state_create, }; +bool +is_columnar_scan(const CustomScan *scan) +{ + return scan->methods == &columnar_scan_plan_methods; +} + typedef struct VectorQualInfoHypercore { VectorQualInfo vqinfo; diff --git a/tsl/src/nodes/columnar_scan/columnar_scan.h b/tsl/src/nodes/columnar_scan/columnar_scan.h index 9350e8fe2c5..1dc9c94f436 100644 --- a/tsl/src/nodes/columnar_scan/columnar_scan.h +++ b/tsl/src/nodes/columnar_scan/columnar_scan.h @@ -20,6 +20,7 @@ typedef struct ColumnarScanPath extern ColumnarScanPath *columnar_scan_path_create(PlannerInfo *root, RelOptInfo *rel, Relids required_outer, int parallel_workers); extern void columnar_scan_set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Hypertable *ht); +extern bool is_columnar_scan(const CustomScan *scan); extern void _columnar_scan_init(void); #endif /* TIMESCALEDB_COLUMNAR_SCAN_H */