Skip to content

Commit

Permalink
Evaluate stable expressions in vectorized filters at run time
Browse files Browse the repository at this point in the history
This allows vectorizing common filters such as ts > now() - interval '1 day'.
  • Loading branch information
akuzm committed Nov 3, 2023
1 parent 7f4f23b commit 61f2606
Show file tree
Hide file tree
Showing 24 changed files with 792 additions and 589 deletions.
4 changes: 2 additions & 2 deletions tsl/src/nodes/decompress_chunk/compressed_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ make_single_value_arrow(Oid pgtype, Datum datum, bool isnull)
static void
apply_vector_quals(DecompressChunkState *chunk_state, DecompressBatchState *batch_state)
{
if (!chunk_state->vectorized_quals)
if (!chunk_state->vectorized_quals_constified)
{
return;
}
Expand All @@ -98,7 +98,7 @@ apply_vector_quals(DecompressChunkState *chunk_state, DecompressBatchState *batc
* Compute the quals.
*/
ListCell *lc;
foreach (lc, chunk_state->vectorized_quals)
foreach (lc, chunk_state->vectorized_quals_constified)
{
/* For now we only support "Var ? Const" predicates. */
OpExpr *oe = castNode(OpExpr, lfirst(lc));
Expand Down
57 changes: 54 additions & 3 deletions tsl/src/nodes/decompress_chunk/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <nodes/bitmapset.h>
#include <nodes/makefuncs.h>
#include <nodes/nodeFuncs.h>
#include <optimizer/optimizer.h>
#include <parser/parsetree.h>
#include <rewrite/rewriteManip.h>
#include <utils/datum.h>
Expand Down Expand Up @@ -160,7 +161,7 @@ decompress_chunk_state_create(CustomScan *cscan)

Assert(IsA(cscan->custom_exprs, List));
Assert(list_length(cscan->custom_exprs) == 1);
chunk_state->vectorized_quals = linitial(cscan->custom_exprs);
chunk_state->vectorized_quals_original = linitial(cscan->custom_exprs);

return (Node *) chunk_state;
}
Expand Down Expand Up @@ -475,6 +476,51 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
{
elog(ERROR, "debug: batch sorted merge is required but not used");
}

/* Constify stable expressions in vectorized predicates. */
chunk_state->have_constant_false_vectorized_qual = false;
PlannerGlobal glob = {
.boundParams = node->ss.ps.state->es_param_list_info,
};
PlannerInfo root = {
.glob = &glob,
};
ListCell *lc;
foreach (lc, chunk_state->vectorized_quals_original)
{
Node *constified = estimate_expression_value(&root, (Node *) lfirst(lc));

/*
* Note that some expressions are evaluated to a null Const, like a
* strict comparison with stable expression that evaluates to null. If
* we have such filter, no rows can pass, so we set a special flag to
* return early.
*/
if (IsA(constified, Const))
{
Const *c = castNode(Const, constified);
if (c->constisnull || !DatumGetBool(c))
{
chunk_state->have_constant_false_vectorized_qual = true;
break;
}
else
{
/*
* This is a constant true qual, every row passes and we can
* just ignore it. No idea how it can happen though.
*/
Assert(false);
continue;
}
}

OpExpr *opexpr = castNode(OpExpr, constified);
Ensure(IsA(lsecond(opexpr->args), Const),
"failed to evaluate runtime constant in vectorized filter");
chunk_state->vectorized_quals_constified =
lappend(chunk_state->vectorized_quals_constified, constified);
}
}

/*
Expand Down Expand Up @@ -738,6 +784,11 @@ decompress_chunk_exec_impl(DecompressChunkState *chunk_state,
return perform_vectorized_aggregation(chunk_state);
}

if (chunk_state->have_constant_false_vectorized_qual)
{
return NULL;
}

queue->pop(chunk_state);
while (queue->needs_next_batch(chunk_state))
{
Expand Down Expand Up @@ -806,13 +857,13 @@ decompress_chunk_explain(CustomScanState *node, List *ancestors, ExplainState *e
{
DecompressChunkState *chunk_state = (DecompressChunkState *) node;

ts_show_scan_qual(chunk_state->vectorized_quals,
ts_show_scan_qual(chunk_state->vectorized_quals_original,
"Vectorized Filter",
&node->ss.ps,
ancestors,
es);

if (!node->ss.ps.plan->qual && chunk_state->vectorized_quals)
if (!node->ss.ps.plan->qual && chunk_state->vectorized_quals_original)
{
/*
* The normal explain won't show this if there are no normal quals but
Expand Down
9 changes: 7 additions & 2 deletions tsl/src/nodes/decompress_chunk/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,14 @@ typedef struct DecompressChunkState
/*
* For some predicates, we have more efficient implementation that work on
* the entire compressed batch in one go. They go to this list, and the rest
* goes into the usual ss.ps.qual.
* goes into the usual ss.ps.qual. Note that we constify stable functions
* in these predicates at execution time, but have to keep the original
* version for EXPLAIN. We also need special handling for quals that
* evaluate to constant false, hence the flag.
*/
List *vectorized_quals;
List *vectorized_quals_original;
List *vectorized_quals_constified;
bool have_constant_false_vectorized_qual;

/*
* Make non-refcounted copies of the tupdesc for reuse across all batch states
Expand Down
102 changes: 88 additions & 14 deletions tsl/src/nodes/decompress_chunk/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -377,27 +377,89 @@ find_attr_pos_in_tlist(List *targetlist, AttrNumber pos)
}

static bool
qual_is_vectorizable(DecompressChunkPath *path, Node *qual)
contains_volatile_functions_checker(Oid func_id, void *context)
{
return (func_volatile(func_id) == PROVOLATILE_VOLATILE);
}

static bool
is_not_runtime_constant_walker(Node *node, void *context)
{
if (node == NULL)
{
return false;
}

switch (nodeTag(node))
{
case T_Var:
case T_PlaceHolderVar:
case T_Param:
/*
* We might want to support these nodes to have vectorizable
* join clauses (T_Var), join clauses referencing a variable that is
* above outer join (T_PlaceHolderVar) or initplan parameters and
* prepared statement parameters (T_Param). We don't support them at
* the moment.
*/
return true;
default:
if (check_functions_in_node(node,
contains_volatile_functions_checker,
/* context = */ NULL))
{
return true;
}
return expression_tree_walker(node,
is_not_runtime_constant_walker,
/* context = */ NULL);
}
}

/*
* Check if the given node is a run-time constant, i.e. it doesn't contain
* volatile functions or variables or parameters. This means we can evaluate
* it at run time, allowing us to apply the vectorized comparison operators
* that have the form "Var op Const". This applies for example to filter
* expressions like `time > now() - interval '1 hour'`.
* Note that we do the same evaluation when doing run time chunk exclusion, but
* there is no good way to pass the evaluated clauses to the underlying nodes
* like this DecompressChunk node.
*/
static bool
is_not_runtime_constant(Node *node)
{
bool result = is_not_runtime_constant_walker(node, /* context = */ NULL);
return result;
}

/*
* Try to check if the current qual is vectorizable, and if needed make a
* commuted copy. If not, return NULL.
*/
static Node *
make_vectorized_qual(DecompressChunkPath *path, Node *qual)
{
/* Only simple "Var op Const" binary predicates for now. */
if (!IsA(qual, OpExpr))
{
return false;
return NULL;
}

OpExpr *o = castNode(OpExpr, qual);

if (list_length(o->args) != 2)
{
return false;
return NULL;
}

if (IsA(lsecond(o->args), Var) && IsA(linitial(o->args), Const))
if (IsA(lsecond(o->args), Var))
{
/* Try to commute the operator if the constant is on the right. */
Oid commutator_opno = get_commutator(o->opno);
if (OidIsValid(commutator_opno))
{
o = (OpExpr *) copyObject(o);
o->opno = commutator_opno;
/*
* opfuncid is a cache, we can set it to InvalidOid like the
Expand All @@ -408,9 +470,14 @@ qual_is_vectorizable(DecompressChunkPath *path, Node *qual)
}
}

if (!IsA(linitial(o->args), Var) || !IsA(lsecond(o->args), Const))
/*
* We can vectorize the operation where the left side is a Var and the right
* side is a constant or can be evaluated to a constant at run time (e.g.
* contains stable functions).
*/
if (!IsA(linitial(o->args), Var) || is_not_runtime_constant(lsecond(o->args)))
{
return false;
return NULL;
}

Var *var = castNode(Var, linitial(o->args));
Expand All @@ -424,32 +491,39 @@ qual_is_vectorizable(DecompressChunkPath *path, Node *qual)
.bulk_decompression_possible)
{
/* This column doesn't support bulk decompression. */
return false;
return NULL;
}

Oid opcode = get_opcode(o->opno);
if (get_vector_const_predicate(opcode))
{
return true;
return (Node *) o;
}

return false;
return NULL;
}

/*
* Find the scan qualifiers that can be vectorized and put them into a separate
* list.
*/
static void
find_vectorized_quals(DecompressChunkPath *path, List *qual, List **vectorized,
find_vectorized_quals(DecompressChunkPath *path, List *qual_list, List **vectorized,
List **nonvectorized)
{
ListCell *lc;
foreach (lc, qual)
foreach (lc, qual_list)
{
Node *node = lfirst(lc);
List **dest = qual_is_vectorizable(path, node) ? vectorized : nonvectorized;
*dest = lappend(*dest, node);
Node *source_qual = lfirst(lc);
Node *vectorized_qual = make_vectorized_qual(path, source_qual);
if (vectorized_qual)
{
*vectorized = lappend(*vectorized, vectorized_qual);
}
else
{
*nonvectorized = lappend(*nonvectorized, source_qual);
}
}
}

Expand Down
16 changes: 6 additions & 10 deletions tsl/test/expected/agg_partials_pushdown.out
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ SELECT count(*), sum(v0), sum(v1), sum(v2), sum(v3) FROM testtable WHERE time >=
Output: PARTIAL count(*), PARTIAL sum(_hyper_1_1_chunk.v0), PARTIAL sum(_hyper_1_1_chunk.v1), PARTIAL sum(_hyper_1_1_chunk.v2), PARTIAL sum(_hyper_1_1_chunk.v3)
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk (actual rows=25 loops=1)
Output: _hyper_1_1_chunk.v0, _hyper_1_1_chunk.v1, _hyper_1_1_chunk.v2, _hyper_1_1_chunk.v3
Filter: (_hyper_1_1_chunk."time" >= ('2000-01-01 00:00:00+0'::cstring)::timestamp with time zone)
Vectorized Filter: (_hyper_1_1_chunk."time" <= 'Mon Jan 31 16:00:00 2000 PST'::timestamp with time zone)
Vectorized Filter: ((_hyper_1_1_chunk."time" <= 'Mon Jan 31 16:00:00 2000 PST'::timestamp with time zone) AND (_hyper_1_1_chunk."time" >= ('2000-01-01 00:00:00+0'::cstring)::timestamp with time zone))
Bulk Decompression: true
-> Seq Scan on _timescaledb_internal.compress_hyper_2_3_chunk (actual rows=5 loops=1)
Output: compress_hyper_2_3_chunk.filter_1, compress_hyper_2_3_chunk.filler_2, compress_hyper_2_3_chunk.filler_3, compress_hyper_2_3_chunk."time", compress_hyper_2_3_chunk.device_id, compress_hyper_2_3_chunk.v0, compress_hyper_2_3_chunk.v1, compress_hyper_2_3_chunk.v2, compress_hyper_2_3_chunk.v3, compress_hyper_2_3_chunk._ts_meta_count, compress_hyper_2_3_chunk._ts_meta_sequence_num, compress_hyper_2_3_chunk._ts_meta_min_1, compress_hyper_2_3_chunk._ts_meta_max_1
Expand All @@ -142,8 +141,7 @@ SELECT count(*), sum(v0), sum(v1), sum(v2), sum(v3) FROM testtable WHERE time >=
Output: PARTIAL count(*), PARTIAL sum(_hyper_1_2_chunk.v0), PARTIAL sum(_hyper_1_2_chunk.v1), PARTIAL sum(_hyper_1_2_chunk.v2), PARTIAL sum(_hyper_1_2_chunk.v3)
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_2_chunk (actual rows=25 loops=1)
Output: _hyper_1_2_chunk.v0, _hyper_1_2_chunk.v1, _hyper_1_2_chunk.v2, _hyper_1_2_chunk.v3
Filter: (_hyper_1_2_chunk."time" >= ('2000-01-01 00:00:00+0'::cstring)::timestamp with time zone)
Vectorized Filter: (_hyper_1_2_chunk."time" <= 'Mon Jan 31 16:00:00 2000 PST'::timestamp with time zone)
Vectorized Filter: ((_hyper_1_2_chunk."time" <= 'Mon Jan 31 16:00:00 2000 PST'::timestamp with time zone) AND (_hyper_1_2_chunk."time" >= ('2000-01-01 00:00:00+0'::cstring)::timestamp with time zone))
Bulk Decompression: true
-> Seq Scan on _timescaledb_internal.compress_hyper_2_4_chunk (actual rows=5 loops=1)
Output: compress_hyper_2_4_chunk.filter_1, compress_hyper_2_4_chunk.filler_2, compress_hyper_2_4_chunk.filler_3, compress_hyper_2_4_chunk."time", compress_hyper_2_4_chunk.device_id, compress_hyper_2_4_chunk.v0, compress_hyper_2_4_chunk.v1, compress_hyper_2_4_chunk.v2, compress_hyper_2_4_chunk.v3, compress_hyper_2_4_chunk._ts_meta_count, compress_hyper_2_4_chunk._ts_meta_sequence_num, compress_hyper_2_4_chunk._ts_meta_min_1, compress_hyper_2_4_chunk._ts_meta_max_1
Expand All @@ -153,7 +151,7 @@ SELECT count(*), sum(v0), sum(v1), sum(v2), sum(v3) FROM testtable WHERE time >=
-> Seq Scan on _timescaledb_internal._hyper_1_2_chunk (actual rows=25 loops=1)
Output: _hyper_1_2_chunk.v0, _hyper_1_2_chunk.v1, _hyper_1_2_chunk.v2, _hyper_1_2_chunk.v3
Filter: ((_hyper_1_2_chunk."time" <= 'Mon Jan 31 16:00:00 2000 PST'::timestamp with time zone) AND (_hyper_1_2_chunk."time" >= ('2000-01-01 00:00:00+0'::cstring)::timestamp with time zone))
(37 rows)
(35 rows)

-- Force plain / sorted aggregation
SET enable_hashagg = OFF;
Expand All @@ -178,8 +176,7 @@ SELECT count(*), sum(v0), sum(v1), sum(v2), sum(v3) FROM testtable WHERE time >=
Output: PARTIAL count(*), PARTIAL sum(_hyper_1_1_chunk.v0), PARTIAL sum(_hyper_1_1_chunk.v1), PARTIAL sum(_hyper_1_1_chunk.v2), PARTIAL sum(_hyper_1_1_chunk.v3)
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk (actual rows=25 loops=1)
Output: _hyper_1_1_chunk.v0, _hyper_1_1_chunk.v1, _hyper_1_1_chunk.v2, _hyper_1_1_chunk.v3
Filter: (_hyper_1_1_chunk."time" >= ('2000-01-01 00:00:00+0'::cstring)::timestamp with time zone)
Vectorized Filter: (_hyper_1_1_chunk."time" <= 'Mon Jan 31 16:00:00 2000 PST'::timestamp with time zone)
Vectorized Filter: ((_hyper_1_1_chunk."time" <= 'Mon Jan 31 16:00:00 2000 PST'::timestamp with time zone) AND (_hyper_1_1_chunk."time" >= ('2000-01-01 00:00:00+0'::cstring)::timestamp with time zone))
Bulk Decompression: true
-> Seq Scan on _timescaledb_internal.compress_hyper_2_3_chunk (actual rows=5 loops=1)
Output: compress_hyper_2_3_chunk.filter_1, compress_hyper_2_3_chunk.filler_2, compress_hyper_2_3_chunk.filler_3, compress_hyper_2_3_chunk."time", compress_hyper_2_3_chunk.device_id, compress_hyper_2_3_chunk.v0, compress_hyper_2_3_chunk.v1, compress_hyper_2_3_chunk.v2, compress_hyper_2_3_chunk.v3, compress_hyper_2_3_chunk._ts_meta_count, compress_hyper_2_3_chunk._ts_meta_sequence_num, compress_hyper_2_3_chunk._ts_meta_min_1, compress_hyper_2_3_chunk._ts_meta_max_1
Expand All @@ -193,8 +190,7 @@ SELECT count(*), sum(v0), sum(v1), sum(v2), sum(v3) FROM testtable WHERE time >=
Output: PARTIAL count(*), PARTIAL sum(_hyper_1_2_chunk.v0), PARTIAL sum(_hyper_1_2_chunk.v1), PARTIAL sum(_hyper_1_2_chunk.v2), PARTIAL sum(_hyper_1_2_chunk.v3)
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_2_chunk (actual rows=25 loops=1)
Output: _hyper_1_2_chunk.v0, _hyper_1_2_chunk.v1, _hyper_1_2_chunk.v2, _hyper_1_2_chunk.v3
Filter: (_hyper_1_2_chunk."time" >= ('2000-01-01 00:00:00+0'::cstring)::timestamp with time zone)
Vectorized Filter: (_hyper_1_2_chunk."time" <= 'Mon Jan 31 16:00:00 2000 PST'::timestamp with time zone)
Vectorized Filter: ((_hyper_1_2_chunk."time" <= 'Mon Jan 31 16:00:00 2000 PST'::timestamp with time zone) AND (_hyper_1_2_chunk."time" >= ('2000-01-01 00:00:00+0'::cstring)::timestamp with time zone))
Bulk Decompression: true
-> Seq Scan on _timescaledb_internal.compress_hyper_2_4_chunk (actual rows=5 loops=1)
Output: compress_hyper_2_4_chunk.filter_1, compress_hyper_2_4_chunk.filler_2, compress_hyper_2_4_chunk.filler_3, compress_hyper_2_4_chunk."time", compress_hyper_2_4_chunk.device_id, compress_hyper_2_4_chunk.v0, compress_hyper_2_4_chunk.v1, compress_hyper_2_4_chunk.v2, compress_hyper_2_4_chunk.v3, compress_hyper_2_4_chunk._ts_meta_count, compress_hyper_2_4_chunk._ts_meta_sequence_num, compress_hyper_2_4_chunk._ts_meta_min_1, compress_hyper_2_4_chunk._ts_meta_max_1
Expand All @@ -204,7 +200,7 @@ SELECT count(*), sum(v0), sum(v1), sum(v2), sum(v3) FROM testtable WHERE time >=
-> Seq Scan on _timescaledb_internal._hyper_1_2_chunk (actual rows=25 loops=1)
Output: _hyper_1_2_chunk.v0, _hyper_1_2_chunk.v1, _hyper_1_2_chunk.v2, _hyper_1_2_chunk.v3
Filter: ((_hyper_1_2_chunk."time" <= 'Mon Jan 31 16:00:00 2000 PST'::timestamp with time zone) AND (_hyper_1_2_chunk."time" >= ('2000-01-01 00:00:00+0'::cstring)::timestamp with time zone))
(37 rows)
(35 rows)

RESET enable_hashagg;
-- Check Append Node under ChunkAppend
Expand Down
Loading

0 comments on commit 61f2606

Please sign in to comment.