diff --git a/.unreleased/fix_6428 b/.unreleased/fix_6428 new file mode 100644 index 00000000000..96ccd897105 --- /dev/null +++ b/.unreleased/fix_6428 @@ -0,0 +1 @@ +Fixes: #6428 Fix index matching during DML decompression diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index 06880bf7ce4..9b489161b87 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -2618,229 +2618,53 @@ ts_fuzz_compression(PG_FUNCTION_ARGS) #if PG14_GE static BatchFilter * -make_batchfilter(char *column_name, StrategyNumber strategy, Const *value, bool is_null_check) +make_batchfilter(char *column_name, StrategyNumber strategy, Oid collation, RegProcedure opcode, + Const *value, bool is_null_check, bool is_null) { BatchFilter *segment_filter = palloc0(sizeof(*segment_filter)); *segment_filter = (BatchFilter){ .strategy = strategy, + .collation = collation, + .opcode = opcode, .value = value, .is_null_check = is_null_check, + .is_null = is_null, }; namestrcpy(&segment_filter->column_name, column_name); return segment_filter; } -/* - * Convert an expression to a Var referencing the index column. - * This method does following: - * 1. Change attribute numbers to match against index column position. - * 2. Set Var nodes varno to INDEX_VAR. - * - * For details refer: match_clause_to_index() and fix_indexqual_operand() - */ -static void -fix_index_qual(Relation comp_chunk_rel, Relation index_rel, Var *var, List **pred, - char *column_name, Node *node, Oid opno) -{ - int i = 0; -#if PG16_LT - Bitmapset *key_columns = RelationGetIndexAttrBitmap(comp_chunk_rel, INDEX_ATTR_BITMAP_ALL); -#else - Bitmapset *key_columns = - RelationGetIndexAttrBitmap(comp_chunk_rel, INDEX_ATTR_BITMAP_HOT_BLOCKING); - key_columns = - bms_add_members(key_columns, - RelationGetIndexAttrBitmap(comp_chunk_rel, INDEX_ATTR_BITMAP_SUMMARIZED)); -#endif - - for (i = 0; i < index_rel->rd_index->indnatts; i++) - { - AttrNumber attnum = index_rel->rd_index->indkey.values[i]; - char *colname = get_attname(RelationGetRelid(comp_chunk_rel), attnum, true); - if (strcmp(colname, column_name) == 0) - { - Oid opfamily = index_rel->rd_opfamily[i]; - /* assert if operator opno is not a member of opfamily */ - if (opno && !op_in_opfamily(opno, opfamily)) - Assert(false); - var->varattno = i + 1; - break; - } - } - /* mark this as an index column */ - var->varno = INDEX_VAR; - i = -1; - /* - * save predicates in the same order as that of columns - * defined in the index. - */ - while ((i = bms_next_member(key_columns, i)) > 0) - { - AttrNumber attno = i + FirstLowInvalidHeapAttributeNumber; - char *attname = get_attname(comp_chunk_rel->rd_id, attno, false); - if (strcmp(attname, column_name) == 0) - { - pred[attno] = lappend(pred[attno], node); - break; - } - } -} - -/* - * This method will fix index qualifiers and also reorder - * index quals to match against the column order in the index. - * - * For example: - * for a given condition like "WHERE y = 10 AND x = 8" - * if matched index is defined as index (a,x,y) - * then WHERE condition should be rearraged as - * "WHERE x = 8 AND y = 10" - * - * This method will return, fixed and reordered index - * qualifier list. - */ -static List * -fix_and_reorder_index_filters(Relation comp_chunk_rel, Relation index_rel, - List *segmentby_predicates, List *index_filters) -{ - List *ordered_index_filters = NIL; - List *idx_filters[INDEX_MAX_KEYS]; - - for (int i = 0; i < INDEX_MAX_KEYS; i++) - idx_filters[i] = NIL; - - ListCell *lp; - ListCell *lf; - forboth (lp, segmentby_predicates, lf, index_filters) - { - Node *node = lfirst(lp); - BatchFilter *sf = lfirst(lf); - - if (node == NULL) - continue; - - Var *var; - Oid opno; - switch (nodeTag(node)) - { - case T_OpExpr: - { - OpExpr *opexpr = (OpExpr *) node; - Expr *leftop, *rightop; - Const *arg_value; - bool switch_operands = false; - - opno = opexpr->opno; - leftop = linitial(opexpr->args); - rightop = lsecond(opexpr->args); - - if (IsA(leftop, RelabelType)) - leftop = ((RelabelType *) leftop)->arg; - if (IsA(rightop, RelabelType)) - rightop = ((RelabelType *) rightop)->arg; - - if (IsA(leftop, Var) && IsA(rightop, Const)) - { - var = (Var *) leftop; - arg_value = (Const *) rightop; - } - else if (IsA(rightop, Var) && IsA(leftop, Const)) - { - switch_operands = true; - var = (Var *) rightop; - arg_value = (Const *) leftop; - } - else - continue; - OpExpr *newclause = NULL; - Expr *newvar = NULL; - newclause = makeNode(OpExpr); - memcpy(newclause, opexpr, sizeof(OpExpr)); - newvar = (Expr *) copyObject(var); - /* - * Index quals always should be in op form. If - * user specifies qual as op , we will convert to - * op form and change opno and opfamily accordingly - */ - linitial(newclause->args) = newvar; - lsecond(newclause->args) = arg_value; - if (switch_operands) - { - newclause->opno = get_commutator(opno); - newclause->opfuncid = get_opcode(newclause->opno); - opno = newclause->opno; - } - fix_index_qual(comp_chunk_rel, - index_rel, - (Var *) newvar, - idx_filters, - sf->column_name.data, - (Node *) newclause, - opno); - } - break; - case T_NullTest: - { - NullTest *ntest = (NullTest *) node; - if (IsA(ntest->arg, Var)) - { - var = (Var *) ntest->arg; - OpExpr *newclause = copyObject((OpExpr *) node); - Expr *newvar = (Expr *) copyObject(var); - ((NullTest *) newclause)->arg = newvar; - fix_index_qual(comp_chunk_rel, - index_rel, - (Var *) newvar, - idx_filters, - sf->column_name.data, - (Node *) newclause, - 0); - } - } - break; - default: - break; - } - } - /* Reorder the Var nodes to align with column order in indexes */ - for (int i = 0; i < INDEX_MAX_KEYS; i++) - { - if (idx_filters[i]) - { - ListCell *c; - foreach (c, idx_filters[i]) - { - ordered_index_filters = lappend(ordered_index_filters, lfirst(c)); - } - } - } - return ordered_index_filters; -} - /* * A compressed chunk can have multiple indexes. For a given list * of columns in index_filters, find the matching index which has - * all of the columns. + * the most columns based on index_filters and adjust the filters + * if necessary. * Return matching index if found else return NULL. * - * Note: This method will not find the best matching index. + * Note: This method will find the best matching index based on + * number of filters it matches. If an index matches all the filters, + * it will be chosen. Otherwise, it will try to select the index + * which has most matches. If there are multiple indexes have + * the same number of matches, it will pick the first one it finds. * For example * for a given condition like "WHERE X = 10 AND Y = 8" * if there are multiple indexes like * 1. index (a,b,c,x) * 2. index (a,x,y) * 3. index (x) - * 4. index (x,y) - * In this case 2nd index is returned. + * In this case 2nd index is returned. If that one didn't exist, + * it would return the 1st index. */ static Relation -find_matching_index(Relation comp_chunk_rel, List *index_filters) +find_matching_index(Relation comp_chunk_rel, List **index_filters, List **heap_filters) { List *index_oids; ListCell *lc; - int total_filters = index_filters->length; + int total_filters = list_length(*index_filters); + int max_match_count = 0; + Relation result_rel = NULL; /* get list of indexes defined on compressed chunk */ index_oids = RelationGetIndexList(comp_chunk_rel); @@ -2848,14 +2672,24 @@ find_matching_index(Relation comp_chunk_rel, List *index_filters) { int match_count = 0; Relation index_rel = index_open(lfirst_oid(lc), AccessShareLock); - if (index_rel->rd_index->indnatts < total_filters) + IndexInfo *index_info = BuildIndexInfo(index_rel); + + /* Can't use partial or expression indexes */ + if (index_info->ii_Predicate != NIL || index_info->ii_Expressions != NIL) { - /* skip all indexes which can never match */ index_close(index_rel, AccessShareLock); continue; } + + /* Can only use Btree indexes */ + if (index_info->ii_Am != BTREE_AM_OID) + { + index_close(index_rel, AccessShareLock); + continue; + } + ListCell *li; - foreach (li, index_filters) + foreach (li, *index_filters) { for (int i = 0; i < index_rel->rd_index->indnatts; i++) { @@ -2872,13 +2706,60 @@ find_matching_index(Relation comp_chunk_rel, List *index_filters) } if (match_count == total_filters) { - elog(DEBUG2, "index \"%s\" is used for scan. ", RelationGetRelationName(index_rel)); /* found index which has all columns specified in WHERE */ + if (result_rel) + index_close(result_rel, AccessShareLock); + if (ts_guc_debug_compression_path_info) + elog(INFO, "Index \"%s\" is used for scan. ", RelationGetRelationName(index_rel)); return index_rel; } + + if (match_count > max_match_count) + { + max_match_count = match_count; + result_rel = index_rel; + continue; + } index_close(index_rel, AccessShareLock); } - return NULL; + + /* No matching index whatsoever */ + if (!result_rel) + { + *heap_filters = list_concat(*heap_filters, *index_filters); + *index_filters = list_truncate(*index_filters, 0); + return NULL; + } + + /* We found an index which matches partially. + * It can be used but we need to transfer the unmatched + * filters from index_filters to heap filters. + */ + for (int i = 0; i < list_length(*index_filters); i++) + { + BatchFilter *sf = list_nth(*index_filters, i); + bool match = false; + for (int j = 0; j < result_rel->rd_index->indnatts; j++) + { + AttrNumber attnum = result_rel->rd_index->indkey.values[j]; + char *attname = get_attname(RelationGetRelid(comp_chunk_rel), attnum, false); + /* ensure column exists in index relation */ + if (!strcmp(attname, NameStr(sf->column_name))) + { + match = true; + break; + } + } + + if (!match) + { + *heap_filters = lappend(*heap_filters, sf); + *index_filters = list_delete_nth_cell(*index_filters, i); + } + } + if (ts_guc_debug_compression_path_info) + elog(INFO, "Index \"%s\" is used for scan. ", RelationGetRelationName(result_rel)); + return result_rel; } /* @@ -2891,8 +2772,8 @@ find_matching_index(Relation comp_chunk_rel, List *index_filters) * be used to build scan keys later. */ static void -fill_predicate_context(Chunk *ch, List *predicates, List **filters, List **index_filters, - List **segmentby_predicates, List **is_null) +fill_predicate_context(Chunk *ch, List *predicates, List **heap_filters, List **index_filters, + List **is_null) { ListCell *lc; foreach (lc, predicates) @@ -2906,6 +2787,8 @@ fill_predicate_context(Chunk *ch, List *predicates, List **filters, List **index case T_OpExpr: { OpExpr *opexpr = (OpExpr *) node; + RegProcedure opcode = opexpr->opfuncid; + Oid collation = opexpr->inputcollid; Expr *leftop, *rightop; Const *arg_value; @@ -2953,9 +2836,11 @@ fill_predicate_context(Chunk *ch, List *predicates, List **filters, List **index *index_filters = lappend(*index_filters, make_batchfilter(column_name, op_strategy, + collation, + opcode, arg_value, - false)); /* is_null_check */ - *segmentby_predicates = lappend(*segmentby_predicates, node); + false, /* is_null_check */ + false)); /* is_null */ } } } @@ -2967,38 +2852,50 @@ fill_predicate_context(Chunk *ch, List *predicates, List **filters, List **index case BTEqualStrategyNumber: { /* orderby col = value implies min <= value and max >= value */ - *filters = lappend(*filters, - make_batchfilter(column_segment_min_name(index), - BTLessEqualStrategyNumber, - arg_value, - false)); /* is_null_check */ - *filters = lappend(*filters, - make_batchfilter(column_segment_max_name(index), - BTGreaterEqualStrategyNumber, - arg_value, - false)); /* is_null_check */ + *heap_filters = lappend(*heap_filters, + make_batchfilter(column_segment_min_name(index), + BTLessEqualStrategyNumber, + collation, + opcode, + arg_value, + false, /* is_null_check */ + false)); /* is_null */ + *heap_filters = lappend(*heap_filters, + make_batchfilter(column_segment_max_name(index), + BTGreaterEqualStrategyNumber, + collation, + opcode, + arg_value, + false, /* is_null_check */ + false)); /* is_null */ } break; case BTLessStrategyNumber: case BTLessEqualStrategyNumber: { /* orderby col <[=] value implies min <[=] value */ - *filters = lappend(*filters, - make_batchfilter(column_segment_min_name(index), - op_strategy, - arg_value, - false)); /* is_null_check */ + *heap_filters = lappend(*heap_filters, + make_batchfilter(column_segment_min_name(index), + op_strategy, + collation, + opcode, + arg_value, + false, /* is_null_check */ + false)); /* is_null */ } break; case BTGreaterStrategyNumber: case BTGreaterEqualStrategyNumber: { /* orderby col >[=] value implies max >[=] value */ - *filters = lappend(*filters, - make_batchfilter(column_segment_max_name(index), - op_strategy, - arg_value, - false)); /* is_null_check */ + *heap_filters = lappend(*heap_filters, + make_batchfilter(column_segment_max_name(index), + op_strategy, + collation, + opcode, + arg_value, + false, /* is_null_check */ + false)); /* is_null */ } } } @@ -3018,12 +2915,15 @@ fill_predicate_context(Chunk *ch, List *predicates, List **filters, List **index ts_hypertable_compression_get_by_pkey(ch->fd.hypertable_id, column_name); if (COMPRESSIONCOL_IS_SEGMENT_BY(fd)) { - *index_filters = lappend(*index_filters, - make_batchfilter(column_name, - InvalidStrategy, - NULL, - true)); /* is_null_check */ - *segmentby_predicates = lappend(*segmentby_predicates, node); + *index_filters = + lappend(*index_filters, + make_batchfilter(column_name, + InvalidStrategy, + InvalidOid, + InvalidOid, + NULL, + true, /* is_null_check */ + ntest->nulltesttype == IS_NULL)); /* is_null */ if (ntest->nulltesttype == IS_NULL) *is_null = lappend_int(*is_null, 1); else @@ -3050,16 +2950,16 @@ fill_predicate_context(Chunk *ch, List *predicates, List **filters, List **index * OUT param null_columns is saved with column attribute number. */ static ScanKeyData * -build_update_delete_scankeys(RowDecompressor *decompressor, List *filters, int *num_scankeys, +build_update_delete_scankeys(RowDecompressor *decompressor, List *heap_filters, int *num_scankeys, Bitmapset **null_columns) { ListCell *lc; BatchFilter *filter; int key_index = 0; - ScanKeyData *scankeys = palloc0(filters->length * sizeof(ScanKeyData)); + ScanKeyData *scankeys = palloc0(heap_filters->length * sizeof(ScanKeyData)); - foreach (lc, filters) + foreach (lc, heap_filters) { filter = lfirst(lc); AttrNumber attno = get_attnum(decompressor->in_rel->rd_id, NameStr(filter->column_name)); @@ -3143,9 +3043,12 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num TupleTableSlot *slot = table_slot_create(decompressor->in_rel, NULL); TableScanDesc scan = table_beginscan(decompressor->in_rel, snapshot, num_scankeys, scankeys); + int num_scanned_rows = 0; + int num_filtered_rows = 0; while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) { + num_scanned_rows++; bool skip_tuple = false; int attrno = bms_next_member(null_columns, -1); int pos = 0; @@ -3168,7 +3071,10 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num pos++; } if (skip_tuple) + { + num_filtered_rows++; continue; + } TM_FailureData tmfd; TM_Result result; @@ -3205,6 +3111,15 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num table_endscan(scan); ExecDropSingleTupleTableSlot(slot); + if (ts_guc_debug_compression_path_info) + { + elog(INFO, + "Number of compressed rows fetched from table scan: %d. " + "Number of compressed rows filtered: %d.", + num_scanned_rows, + num_filtered_rows); + } + return true; } @@ -3213,37 +3128,45 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num * scans on compressed chunks. */ static ScanKeyData * -build_index_scankeys(Relation in_rel, Relation index_rel, List *predicates, int *num_scankeys, - EState *estate) -{ - IndexScan *node = makeNode(IndexScan); - Plan *plan = &node->scan.plan; - - plan->qual = predicates; - node->scan.scanrelid = in_rel->rd_id; - node->indexid = index_rel->rd_id; - node->indexqual = predicates; - node->indexorderdir = ForwardScanDirection; - - IndexScanState *indexstate; - indexstate = makeNode(IndexScanState); - indexstate->ss.ps.plan = (Plan *) node; - indexstate->ss.ps.state = estate; - indexstate->iss_RelationDesc = index_rel; - - ExecIndexBuildScanKeys((PlanState *) indexstate, - indexstate->iss_RelationDesc, - node->indexqual, - false, - &indexstate->iss_ScanKeys, - &indexstate->iss_NumScanKeys, - &indexstate->iss_RuntimeKeys, - &indexstate->iss_NumRuntimeKeys, - NULL, /* no ArrayKeys */ - NULL); - - *num_scankeys = indexstate->iss_NumScanKeys; - return indexstate->iss_ScanKeys; +build_index_scankeys(Relation index_rel, List *index_filters, int *num_scankeys) +{ + ListCell *lc; + BatchFilter *filter = NULL; + *num_scankeys = list_length(index_filters); + ScanKeyData *scankey = palloc0(sizeof(ScanKeyData) * (*num_scankeys)); + int idx = 0; + int flags; + + /* Order scankeys based on index attribute order */ + for (int attno = 1; attno <= index_rel->rd_index->indnatts && idx < *num_scankeys; attno++) + { + char *attname = get_attname(RelationGetRelid(index_rel), attno, false); + foreach (lc, index_filters) + { + filter = lfirst(lc); + if (!strcmp(attname, NameStr(filter->column_name))) + { + flags = 0; + if (filter->is_null_check) + { + flags = SK_ISNULL | (filter->is_null ? SK_SEARCHNULL : SK_SEARCHNOTNULL); + } + + ScanKeyEntryInitialize(&scankey[idx++], + flags, + attno, + filter->strategy, + InvalidOid, /* no strategy subtype */ + filter->collation, + filter->opcode, + filter->value ? filter->value->constvalue : 0); + break; + } + } + } + + Assert(idx == *num_scankeys); + return scankey; } /* @@ -3256,11 +3179,12 @@ build_index_scankeys(Relation in_rel, Relation index_rel, List *predicates, int static bool decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel, ScanKeyData *index_scankeys, int num_index_scankeys, - ScanKeyData *scankeys, int num_scankeys, bool *chunk_status_changed) + ScanKeyData *scankeys, int num_scankeys, Bitmapset *null_columns, + List *is_nulls, bool *chunk_status_changed) { Snapshot snapshot = GetTransactionSnapshot(); int num_segmentby_filtered_rows = 0; - int num_orderby_filtered_rows = 0; + int num_heap_filtered_rows = 0; IndexScanDesc scan = index_beginscan(decompressor->in_rel, index_rel, snapshot, num_index_scankeys, 0); @@ -3295,7 +3219,7 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel #endif if (!valid) { - num_orderby_filtered_rows++; + num_heap_filtered_rows++; if (should_free) heap_freetuple(compressed_tuple); @@ -3303,6 +3227,33 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel } } + bool skip_tuple = false; + int attrno = bms_next_member(null_columns, -1); + int pos = 0; + bool is_null_condition = 0; + bool seg_col_is_null = false; + for (; attrno >= 0; attrno = bms_next_member(null_columns, attrno)) + { + is_null_condition = list_nth_int(is_nulls, pos); + seg_col_is_null = slot_attisnull(slot, attrno); + if ((seg_col_is_null && !is_null_condition) || (!seg_col_is_null && is_null_condition)) + { + /* + * if segment by column in the scanned tuple has non null value + * and IS NULL is specified, OR segment by column has null value + * and IS NOT NULL is specified then skip this tuple + */ + skip_tuple = true; + break; + } + pos++; + } + if (skip_tuple) + { + num_heap_filtered_rows++; + continue; + } + heap_deform_tuple(compressed_tuple, decompressor->in_desc, decompressor->compressed_datums, @@ -3338,9 +3289,9 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel { elog(INFO, "Number of compressed rows fetched from index: %d. " - "Number of compressed rows filtered by orderby columns: %d.", + "Number of compressed rows filtered by heap filters: %d.", num_segmentby_filtered_rows, - num_orderby_filtered_rows); + num_heap_filtered_rows); } ExecDropSingleTupleTableSlot(slot); @@ -3363,13 +3314,13 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu { /* process each chunk with its corresponding predicates */ - List *filters = NIL; + List *heap_filters = NIL; List *index_filters = NIL; - List *segmentby_predicates = NIL; List *is_null = NIL; ListCell *lc = NULL; Relation chunk_rel; Relation comp_chunk_rel; + Relation matching_index_rel = NULL; Chunk *comp_chunk; RowDecompressor decompressor; BatchFilter *filter; @@ -3381,44 +3332,36 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu ScanKeyData *index_scankeys = NULL; int num_index_scankeys = 0; - fill_predicate_context(chunk, - predicates, - &filters, - &index_filters, - &segmentby_predicates, - &is_null); + fill_predicate_context(chunk, predicates, &heap_filters, &index_filters, &is_null); chunk_rel = table_open(chunk->table_id, RowExclusiveLock); comp_chunk = ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, true); comp_chunk_rel = table_open(comp_chunk->table_id, RowExclusiveLock); decompressor = build_decompressor(comp_chunk_rel, chunk_rel); + if (index_filters) + { + matching_index_rel = find_matching_index(comp_chunk_rel, &index_filters, &heap_filters); + } + write_logical_replication_msg_decompression_start(); - if (filters) + if (heap_filters) { scankeys = - build_update_delete_scankeys(&decompressor, filters, &num_scankeys, &null_columns); + build_update_delete_scankeys(&decompressor, heap_filters, &num_scankeys, &null_columns); } - if (index_filters) + if (matching_index_rel) { - List *ordered_index_filters = NIL; - Relation matching_index_rel = find_matching_index(comp_chunk_rel, index_filters); - Assert(matching_index_rel); - ordered_index_filters = fix_and_reorder_index_filters(comp_chunk_rel, - matching_index_rel, - segmentby_predicates, - index_filters); - index_scankeys = build_index_scankeys(decompressor.in_rel, - matching_index_rel, - ordered_index_filters, - &num_index_scankeys, - estate); + index_scankeys = + build_index_scankeys(matching_index_rel, index_filters, &num_index_scankeys); decompress_batches_using_index(&decompressor, matching_index_rel, index_scankeys, num_index_scankeys, scankeys, num_scankeys, + null_columns, + is_null, &chunk_status_changed); /* close the selected index */ index_close(matching_index_rel, AccessShareLock); @@ -3448,7 +3391,12 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu table_close(chunk_rel, NoLock); table_close(comp_chunk_rel, NoLock); - foreach (lc, filters) + foreach (lc, heap_filters) + { + filter = lfirst(lc); + pfree(filter); + } + foreach (lc, index_filters) { filter = lfirst(lc); pfree(filter); diff --git a/tsl/src/compression/compression.h b/tsl/src/compression/compression.h index bc871e01340..a2622e2e6df 100644 --- a/tsl/src/compression/compression.h +++ b/tsl/src/compression/compression.h @@ -279,10 +279,15 @@ typedef struct BatchFilter NameData column_name; /* Filter operation used */ StrategyNumber strategy; + /* Collation to be used by the operator */ + Oid collation; + /* Operator code used */ + RegProcedure opcode; /* Value to compare with */ Const *value; /* IS NULL or IS NOT NULL */ bool is_null_check; + bool is_null; } BatchFilter; extern Datum tsl_compressed_data_decompress_forward(PG_FUNCTION_ARGS); diff --git a/tsl/test/expected/compression_update_delete.out b/tsl/test/expected/compression_update_delete.out index 94fbd1d26d9..0684ffa90c0 100644 --- a/tsl/test/expected/compression_update_delete.out +++ b/tsl/test/expected/compression_update_delete.out @@ -1329,7 +1329,8 @@ SELECT COUNT(*) AS "total_rows" FROM sample_table \gset SELECT COUNT(*) AS "total_affected_rows" FROM sample_table WHERE c4 = 5 \gset -- delete 10k rows DELETE FROM sample_table WHERE c4 = 5; -INFO: Number of compressed rows fetched from index: 10. Number of compressed rows filtered by orderby columns: 0. +INFO: Index "compress_hyper_20_38_chunk__compressed_hypertable_20_c4__ts_met" is used for scan. +INFO: Number of compressed rows fetched from index: 10. Number of compressed rows filtered by heap filters: 0. -- report 0 rows SELECT count(*) FROM sample_table WHERE c4 = 5; count @@ -1381,7 +1382,8 @@ SELECT COUNT(*) AS "total_rows" FROM sample_table \gset SELECT COUNT(*) AS "total_affected_rows" FROM sample_table WHERE c4 < 5 \gset -- delete 50k rows DELETE FROM sample_table WHERE c4 < 5; -INFO: Number of compressed rows fetched from index: 50. Number of compressed rows filtered by orderby columns: 0. +INFO: Index "compress_hyper_20_38_chunk__compressed_hypertable_20_c4__ts_met" is used for scan. +INFO: Number of compressed rows fetched from index: 50. Number of compressed rows filtered by heap filters: 0. -- report 0 rows SELECT count(*) FROM sample_table WHERE c4 < 5; count @@ -1433,7 +1435,8 @@ SELECT COUNT(*) AS "total_rows" FROM sample_table \gset SELECT COUNT(*) AS "total_affected_rows" FROM sample_table WHERE c4 >= 5 \gset -- delete 50k rows DELETE FROM sample_table WHERE c4 >= 5; -INFO: Number of compressed rows fetched from index: 50. Number of compressed rows filtered by orderby columns: 0. +INFO: Index "compress_hyper_20_38_chunk__compressed_hypertable_20_c4__ts_met" is used for scan. +INFO: Number of compressed rows fetched from index: 50. Number of compressed rows filtered by heap filters: 0. -- report 0 rows SELECT count(*) FROM sample_table WHERE c4 >= 5; count @@ -1486,6 +1489,7 @@ SELECT COUNT(*) AS "total_rows" FROM sample_table \gset SELECT COUNT(*) AS "total_affected_rows" FROM sample_table WHERE c2 = 3 \gset -- delete 10k rows DELETE FROM sample_table WHERE c2 = 3; +INFO: Number of compressed rows fetched from table scan: 100. Number of compressed rows filtered: 0. -- report 0 rows SELECT count(*) FROM sample_table WHERE c2 = 3; count @@ -1538,6 +1542,7 @@ SELECT COUNT(*) AS "total_rows" FROM sample_table \gset SELECT COUNT(*) AS "total_affected_rows" FROM sample_table WHERE c1 < 2 \gset -- delete 20k rows DELETE FROM sample_table WHERE c1 < 2; +INFO: Number of compressed rows fetched from table scan: 20. Number of compressed rows filtered: 0. -- report 0 rows SELECT count(*) FROM sample_table WHERE c1 < 2; count @@ -1590,6 +1595,7 @@ SELECT COUNT(*) AS "total_rows" FROM sample_table \gset SELECT COUNT(*) AS "total_affected_rows" FROM sample_table WHERE c1 >= 7 \gset -- delete 30k rows DELETE FROM sample_table WHERE c1 >= 7; +INFO: Number of compressed rows fetched from table scan: 30. Number of compressed rows filtered: 0. -- report 0 rows SELECT count(*) FROM sample_table WHERE c1 >= 7; count @@ -1643,7 +1649,8 @@ SELECT COUNT(*) AS "total_rows" FROM sample_table \gset SELECT COUNT(*) AS "total_affected_rows" FROM sample_table WHERE c4 = 5 and c1 = 5 \gset -- delete 1k rows DELETE FROM sample_table WHERE c4 = 5 and c1 = 5; -INFO: Number of compressed rows fetched from index: 10. Number of compressed rows filtered by orderby columns: 9. +INFO: Index "compress_hyper_20_38_chunk__compressed_hypertable_20_c4__ts_met" is used for scan. +INFO: Number of compressed rows fetched from index: 10. Number of compressed rows filtered by heap filters: 9. -- report 0 rows SELECT count(*) FROM sample_table WHERE c4 = 5 and c1 = 5; count @@ -1697,7 +1704,8 @@ SELECT COUNT(*) AS "total_rows" FROM sample_table \gset SELECT COUNT(*) AS "total_affected_rows" FROM sample_table WHERE c4 > 5 and c2 = 5 \gset -- delete 4k rows DELETE FROM sample_table WHERE c4 > 5 and c2 = 5; -INFO: Number of compressed rows fetched from index: 40. Number of compressed rows filtered by orderby columns: 0. +INFO: Index "compress_hyper_20_38_chunk__compressed_hypertable_20_c4__ts_met" is used for scan. +INFO: Number of compressed rows fetched from index: 40. Number of compressed rows filtered by heap filters: 0. -- report 0 rows SELECT count(*) FROM sample_table WHERE c4 > 5 and c2 = 5; count @@ -1759,7 +1767,8 @@ SELECT COUNT(*) FROM :COMPRESS_CHUNK_1 WHERE c4 = 4 AND _ts_meta_max_1 >= 7; SELECT COUNT(*) AS "total_rows" FROM :COMPRESS_CHUNK_1 WHERE c4 = 4 \gset SELECT COUNT(*) AS "total_affected_rows" FROM :COMPRESS_CHUNK_1 WHERE c4 = 4 AND _ts_meta_max_1 >= 7 \gset UPDATE sample_table SET c3 = c3 + 0 WHERE c4 = 4 AND c1 >= 7; -INFO: Number of compressed rows fetched from index: 10. Number of compressed rows filtered by orderby columns: 7. +INFO: Index "compress_hyper_20_38_chunk__compressed_hypertable_20_c4__ts_met" is used for scan. +INFO: Number of compressed rows fetched from index: 10. Number of compressed rows filtered by heap filters: 7. -- report 7 rows SELECT COUNT(*) FROM :COMPRESS_CHUNK_1 WHERE c4 = 4; count @@ -2391,7 +2400,8 @@ SELECT COUNT(*) FROM tab1 WHERE filler_3 = 5 AND filler_2 = 4; (1 row) UPDATE tab1 SET v0 = v1 + v2 WHERE filler_3 = 5 AND filler_2 = 4; -INFO: Number of compressed rows fetched from index: 4. Number of compressed rows filtered by orderby columns: 0. +INFO: Index "compress_hyper_2_2_chunk_filler_2_filler_3" is used for scan. +INFO: Number of compressed rows fetched from index: 4. Number of compressed rows filtered by heap filters: 0. ROLLBACK; BEGIN; SELECT COUNT(*) FROM tab1 WHERE filler_1 < 5 AND filler_2 = 4; @@ -2401,7 +2411,8 @@ SELECT COUNT(*) FROM tab1 WHERE filler_1 < 5 AND filler_2 = 4; (1 row) UPDATE tab1 SET v0 = v1 + v2 WHERE filler_1 < 5 AND filler_2 = 4; -INFO: Number of compressed rows fetched from index: 4. Number of compressed rows filtered by orderby columns: 0. +INFO: Index "compress_hyper_2_2_chunk_filler_1_filler_2" is used for scan. +INFO: Number of compressed rows fetched from index: 4. Number of compressed rows filtered by heap filters: 0. ROLLBACK; -- idealy filler_1 index should be selected, -- instead first matching index is selected @@ -2413,7 +2424,8 @@ SELECT COUNT(*) FROM tab1 WHERE filler_1 < 5; (1 row) UPDATE tab1 SET v0 = v1 + v2 WHERE filler_1 < 5; -INFO: Number of compressed rows fetched from index: 16. Number of compressed rows filtered by orderby columns: 0. +INFO: Index "compress_hyper_2_2_chunk__compressed_hypertable_2__ts_meta_mi_2" is used for scan. +INFO: Number of compressed rows fetched from index: 16. Number of compressed rows filtered by heap filters: 0. ROLLBACK; RESET timescaledb.debug_compression_path_info; DROP TABLE tab1; @@ -2440,3 +2452,206 @@ SELECT compress_chunk(show_chunks('t')); UPDATE t SET b = 2 WHERE tableoid = 0; UPDATE t SET b = 2 WHERE tableoid is null; DROP TABLE t; +-- github issue: 6367 +\c :TEST_DBNAME :ROLE_SUPERUSER +CREATE DATABASE test6367; +\c test6367 :ROLE_SUPERUSER +SET client_min_messages = ERROR; +CREATE EXTENSION timescaledb CASCADE; +CREATE TABLE t6367 ( + time timestamptz NOT NULL, + source_id varchar(64) NOT NULL, + label varchar, + data jsonb +); +SELECT table_name FROM create_hypertable('t6367', 'time'); + table_name +------------ + t6367 +(1 row) + +ALTER TABLE t6367 SET(timescaledb.compress, timescaledb.compress_segmentby = 'source_id, label', timescaledb.compress_orderby = 'time'); +INSERT INTO t6367 +SELECT time, source_id, label, '{}' AS data +FROM +generate_series('1990-01-01'::timestamptz, '1990-01-10'::timestamptz, INTERVAL '1 day') AS g1(time), +generate_series(1, 3, 1 ) AS g2(source_id), +generate_series(1, 3, 1 ) AS g3(label); +SELECT compress_chunk(c) FROM show_chunks('t6367') c; + compress_chunk +---------------------------------------- + _timescaledb_internal._hyper_1_1_chunk + _timescaledb_internal._hyper_1_2_chunk +(2 rows) + +DROP INDEX _timescaledb_internal._compressed_hypertable_2_source_id_label__ts_meta_sequence__idx; +-- testcase with no index, should use seq scan +set timescaledb.debug_compression_path_info to on; +BEGIN; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; + count +------- + 10 +(1 row) + +UPDATE t6367 SET source_id = '0' WHERE source_id = '2' AND label = '1'; +INFO: Number of compressed rows fetched from table scan: 1. Number of compressed rows filtered: 0. +INFO: Number of compressed rows fetched from table scan: 1. Number of compressed rows filtered: 0. +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; + count +------- + 0 +(1 row) + +ROLLBACK; +-- test case with an index which has only one +-- of the segmentby filters +CREATE INDEX source_id_idx ON _timescaledb_internal._compressed_hypertable_2 (source_id); +BEGIN; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; + count +------- + 10 +(1 row) + +UPDATE t6367 SET source_id = '0' WHERE source_id = '2' AND label = '1'; +INFO: Index "compress_hyper_2_3_chunk_source_id_idx" is used for scan. +INFO: Number of compressed rows fetched from index: 3. Number of compressed rows filtered by heap filters: 2. +INFO: Index "compress_hyper_2_4_chunk_source_id_idx" is used for scan. +INFO: Number of compressed rows fetched from index: 3. Number of compressed rows filtered by heap filters: 2. +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; + count +------- + 0 +(1 row) + +ROLLBACK; +-- test that we are filtering NULL checks +BEGIN; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label IS NULL; + count +------- + 0 +(1 row) + +UPDATE t6367 SET source_id = '0' WHERE source_id = '2' AND label IS NULL; +INFO: Index "compress_hyper_2_3_chunk_source_id_idx" is used for scan. +INFO: Number of compressed rows fetched from index: 3. Number of compressed rows filtered by heap filters: 3. +INFO: Index "compress_hyper_2_4_chunk_source_id_idx" is used for scan. +INFO: Number of compressed rows fetched from index: 3. Number of compressed rows filtered by heap filters: 3. +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label IS NULL; + count +------- + 0 +(1 row) + +ROLLBACK; +BEGIN; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label IS NOT NULL; + count +------- + 30 +(1 row) + +UPDATE t6367 SET source_id = '0' WHERE source_id = '2' AND label IS NOT NULL; +INFO: Index "compress_hyper_2_3_chunk_source_id_idx" is used for scan. +INFO: Number of compressed rows fetched from index: 3. Number of compressed rows filtered by heap filters: 0. +INFO: Index "compress_hyper_2_4_chunk_source_id_idx" is used for scan. +INFO: Number of compressed rows fetched from index: 3. Number of compressed rows filtered by heap filters: 0. +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label IS NOT NULL; + count +------- + 0 +(1 row) + +ROLLBACK; +DROP INDEX _timescaledb_internal.source_id_idx; +-- test case with an index which has multiple same column +CREATE INDEX source_id_source_id_idx ON _timescaledb_internal._compressed_hypertable_2 (source_id, source_id); +BEGIN; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; + count +------- + 10 +(1 row) + +UPDATE t6367 SET source_id = '0' WHERE source_id = '2' AND label = '1'; +INFO: Index "compress_hyper_2_3_chunk_source_id_source_id_idx" is used for scan. +INFO: Number of compressed rows fetched from index: 3. Number of compressed rows filtered by heap filters: 2. +INFO: Index "compress_hyper_2_4_chunk_source_id_source_id_idx" is used for scan. +INFO: Number of compressed rows fetched from index: 3. Number of compressed rows filtered by heap filters: 2. +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; + count +------- + 0 +(1 row) + +ROLLBACK; +DROP INDEX _timescaledb_internal.source_id_source_id_idx; +-- test using a non-btree index +-- fallback to heap scan +CREATE INDEX brin_source_id_idx ON _timescaledb_internal._compressed_hypertable_2 USING brin (source_id); +BEGIN; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; + count +------- + 10 +(1 row) + +UPDATE t6367 SET source_id = '0' WHERE source_id = '2' AND label = '1'; +INFO: Number of compressed rows fetched from table scan: 1. Number of compressed rows filtered: 0. +INFO: Number of compressed rows fetched from table scan: 1. Number of compressed rows filtered: 0. +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; + count +------- + 0 +(1 row) + +ROLLBACK; +DROP INDEX _timescaledb_internal.brin_source_id_idx; +-- test using an expression index +-- should fallback to heap scans +CREATE INDEX expr_source_id_idx ON _timescaledb_internal._compressed_hypertable_2 (upper(source_id)); +BEGIN; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; + count +------- + 10 +(1 row) + +UPDATE t6367 SET source_id = '0' WHERE source_id = '2' AND label = '1'; +INFO: Number of compressed rows fetched from table scan: 1. Number of compressed rows filtered: 0. +INFO: Number of compressed rows fetched from table scan: 1. Number of compressed rows filtered: 0. +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; + count +------- + 0 +(1 row) + +ROLLBACK; +DROP INDEX _timescaledb_internal.expr_source_id_idx; +-- test using a partial index +-- should fallback to heap scans +CREATE INDEX partial_source_id_idx ON _timescaledb_internal._compressed_hypertable_2 (source_id) +WHERE _ts_meta_min_1 > '1990-01-01'::timestamptz; +BEGIN; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; + count +------- + 10 +(1 row) + +UPDATE t6367 SET source_id = '0' WHERE source_id = '2' AND label = '1'; +INFO: Number of compressed rows fetched from table scan: 1. Number of compressed rows filtered: 0. +INFO: Number of compressed rows fetched from table scan: 1. Number of compressed rows filtered: 0. +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; + count +------- + 0 +(1 row) + +ROLLBACK; +RESET timescaledb.debug_compression_path_info; +DROP TABLE t6367; +\c :TEST_DBNAME :ROLE_SUPERUSER +DROP DATABASE test6367; diff --git a/tsl/test/sql/compression_update_delete.sql b/tsl/test/sql/compression_update_delete.sql index 9cf44390984..588d7f089b7 100644 --- a/tsl/test/sql/compression_update_delete.sql +++ b/tsl/test/sql/compression_update_delete.sql @@ -1314,3 +1314,96 @@ SELECT compress_chunk(show_chunks('t')); UPDATE t SET b = 2 WHERE tableoid = 0; UPDATE t SET b = 2 WHERE tableoid is null; DROP TABLE t; + +-- github issue: 6367 +\c :TEST_DBNAME :ROLE_SUPERUSER +CREATE DATABASE test6367; +\c test6367 :ROLE_SUPERUSER +SET client_min_messages = ERROR; +CREATE EXTENSION timescaledb CASCADE; + +CREATE TABLE t6367 ( + time timestamptz NOT NULL, + source_id varchar(64) NOT NULL, + label varchar, + data jsonb +); +SELECT table_name FROM create_hypertable('t6367', 'time'); + +ALTER TABLE t6367 SET(timescaledb.compress, timescaledb.compress_segmentby = 'source_id, label', timescaledb.compress_orderby = 'time'); + +INSERT INTO t6367 +SELECT time, source_id, label, '{}' AS data +FROM +generate_series('1990-01-01'::timestamptz, '1990-01-10'::timestamptz, INTERVAL '1 day') AS g1(time), +generate_series(1, 3, 1 ) AS g2(source_id), +generate_series(1, 3, 1 ) AS g3(label); + +SELECT compress_chunk(c) FROM show_chunks('t6367') c; +DROP INDEX _timescaledb_internal._compressed_hypertable_2_source_id_label__ts_meta_sequence__idx; +-- testcase with no index, should use seq scan +set timescaledb.debug_compression_path_info to on; +BEGIN; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; +UPDATE t6367 SET source_id = '0' WHERE source_id = '2' AND label = '1'; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; +ROLLBACK; +-- test case with an index which has only one +-- of the segmentby filters +CREATE INDEX source_id_idx ON _timescaledb_internal._compressed_hypertable_2 (source_id); +BEGIN; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; +UPDATE t6367 SET source_id = '0' WHERE source_id = '2' AND label = '1'; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; +ROLLBACK; +-- test that we are filtering NULL checks +BEGIN; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label IS NULL; +UPDATE t6367 SET source_id = '0' WHERE source_id = '2' AND label IS NULL; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label IS NULL; +ROLLBACK; +BEGIN; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label IS NOT NULL; +UPDATE t6367 SET source_id = '0' WHERE source_id = '2' AND label IS NOT NULL; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label IS NOT NULL; +ROLLBACK; +DROP INDEX _timescaledb_internal.source_id_idx; +-- test case with an index which has multiple same column +CREATE INDEX source_id_source_id_idx ON _timescaledb_internal._compressed_hypertable_2 (source_id, source_id); +BEGIN; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; +UPDATE t6367 SET source_id = '0' WHERE source_id = '2' AND label = '1'; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; +ROLLBACK; +DROP INDEX _timescaledb_internal.source_id_source_id_idx; +-- test using a non-btree index +-- fallback to heap scan +CREATE INDEX brin_source_id_idx ON _timescaledb_internal._compressed_hypertable_2 USING brin (source_id); +BEGIN; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; +UPDATE t6367 SET source_id = '0' WHERE source_id = '2' AND label = '1'; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; +ROLLBACK; +DROP INDEX _timescaledb_internal.brin_source_id_idx; +-- test using an expression index +-- should fallback to heap scans +CREATE INDEX expr_source_id_idx ON _timescaledb_internal._compressed_hypertable_2 (upper(source_id)); +BEGIN; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; +UPDATE t6367 SET source_id = '0' WHERE source_id = '2' AND label = '1'; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; +ROLLBACK; +DROP INDEX _timescaledb_internal.expr_source_id_idx; +-- test using a partial index +-- should fallback to heap scans +CREATE INDEX partial_source_id_idx ON _timescaledb_internal._compressed_hypertable_2 (source_id) +WHERE _ts_meta_min_1 > '1990-01-01'::timestamptz; +BEGIN; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; +UPDATE t6367 SET source_id = '0' WHERE source_id = '2' AND label = '1'; +SELECT count(*) FROM t6367 WHERE source_id = '2' AND label = '1'; +ROLLBACK; +RESET timescaledb.debug_compression_path_info; +DROP TABLE t6367; +\c :TEST_DBNAME :ROLE_SUPERUSER +DROP DATABASE test6367;