Skip to content

Commit

Permalink
Use table scan API in compression code
Browse files Browse the repository at this point in the history
Refactor the compression code to only use the table scan API when
scanning relations instead of a mix of the table and heap scan
APIs. The table scan API is a higher-level API and recommended as it
works for any type of relation and uses table slots directly, which
means that in some cases a full heap tuple need not be materialized.
  • Loading branch information
erimatnor authored and jnidzwetzki committed Jan 3, 2024
1 parent 63468f6 commit 4d62243
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 123 deletions.
61 changes: 27 additions & 34 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* compress and decompress chunks
*/
#include <postgres.h>
#include <access/tableam.h>
#include <access/xact.h>
#include <catalog/dependency.h>
#include <commands/tablecmds.h>
Expand Down Expand Up @@ -36,6 +37,7 @@
#include "hypercube.h"
#include "hypertable.h"
#include "hypertable_cache.h"
#include "ts_catalog/catalog.h"
#include "ts_catalog/continuous_agg.h"
#include "ts_catalog/hypertable_compression.h"
#include "ts_catalog/compression_chunk_size.h"
Expand Down Expand Up @@ -934,6 +936,7 @@ bool
tsl_recompress_chunk_wrapper(Chunk *uncompressed_chunk)
{
Oid uncompressed_chunk_relid = uncompressed_chunk->table_id;

if (ts_chunk_is_unordered(uncompressed_chunk))
{
if (!decompress_chunk_impl(uncompressed_chunk->hypertable_relid,
Expand Down Expand Up @@ -1129,28 +1132,23 @@ fetch_unmatched_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tuples
Relation uncompressed_chunk_rel,
bool *unmatched_rows_exist)
{
TableScanDesc heapScan;
HeapTuple uncompressed_tuple;
TupleDesc uncompressed_rel_tupdesc = RelationGetDescr(uncompressed_chunk_rel);
TableScanDesc scan;
TupleTableSlot *slot = table_slot_create(uncompressed_chunk_rel, NULL);
Snapshot snapshot = GetLatestSnapshot();

heapScan = table_beginscan(uncompressed_chunk_rel, GetLatestSnapshot(), 0, NULL);
TupleTableSlot *heap_tuple_slot =
MakeTupleTableSlot(uncompressed_rel_tupdesc, &TTSOpsHeapTuple);
scan = table_beginscan(uncompressed_chunk_rel, snapshot, 0, NULL);

while ((uncompressed_tuple = heap_getnext(heapScan, ForwardScanDirection)) != NULL)
while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
if (!(*unmatched_rows_exist))
*unmatched_rows_exist = true;

ExecStoreHeapTuple(uncompressed_tuple, heap_tuple_slot, false);
slot_getallattrs(heap_tuple_slot);

tuplesort_puttupleslot(segment_tuplesortstate, heap_tuple_slot);

simple_heap_delete(uncompressed_chunk_rel, &uncompressed_tuple->t_self);
slot_getallattrs(slot);
tuplesort_puttupleslot(segment_tuplesortstate, slot);
simple_table_tuple_delete(uncompressed_chunk_rel, &slot->tts_tid, snapshot);
}
ExecDropSingleTupleTableSlot(heap_tuple_slot);
table_endscan(heapScan);
ExecDropSingleTupleTableSlot(slot);
table_endscan(scan);
}

static bool
Expand All @@ -1159,9 +1157,8 @@ fetch_matching_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tupleso
Relation uncompressed_chunk_rel,
CompressedSegmentInfo **current_segment)
{
TableScanDesc heapScan;
HeapTuple uncompressed_tuple;
TupleDesc uncompressed_rel_tupdesc = RelationGetDescr(uncompressed_chunk_rel);
TableScanDesc scan;
Snapshot snapshot;
int index = 0;
int nsegbycols_nonnull = 0;
Bitmapset *null_segbycols = NULL;
Expand Down Expand Up @@ -1202,21 +1199,18 @@ fetch_matching_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tupleso
index++;
}

heapScan =
table_beginscan(uncompressed_chunk_rel, GetLatestSnapshot(), nsegbycols_nonnull, scankey);
TupleTableSlot *heap_tuple_slot =
MakeTupleTableSlot(uncompressed_rel_tupdesc, &TTSOpsHeapTuple);
snapshot = GetLatestSnapshot();
scan = table_beginscan(uncompressed_chunk_rel, snapshot, nsegbycols_nonnull, scankey);
TupleTableSlot *slot = table_slot_create(uncompressed_chunk_rel, NULL);

while ((uncompressed_tuple = heap_getnext(heapScan, ForwardScanDirection)) != NULL)
while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
bool valid = true;
/* check for NULL values in this segment manually */
for (int attno = bms_next_member(null_segbycols, -1); attno >= 0;
attno = bms_next_member(null_segbycols, attno))
{
if (!heap_attisnull(uncompressed_tuple,
attno,
RelationGetDescr(uncompressed_chunk_rel)))
if (!slot_attisnull(slot, attno))
{
valid = false;
break;
Expand All @@ -1225,16 +1219,15 @@ fetch_matching_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tupleso
if (valid)
{
matching_exist = true;
ExecStoreHeapTuple(uncompressed_tuple, heap_tuple_slot, false);
slot_getallattrs(heap_tuple_slot);
tuplesort_puttupleslot(segment_tuplesortstate, heap_tuple_slot);
/* simple_heap_delete since we don't expect concurrent updates, have exclusive lock on
* the relation */
simple_heap_delete(uncompressed_chunk_rel, &uncompressed_tuple->t_self);
slot_getallattrs(slot);
tuplesort_puttupleslot(segment_tuplesortstate, slot);
/* simple_table_tuple_delete since we don't expect concurrent
* updates, have exclusive lock on the relation */
simple_table_tuple_delete(uncompressed_chunk_rel, &slot->tts_tid, snapshot);
}
}
ExecDropSingleTupleTableSlot(heap_tuple_slot);
table_endscan(heapScan);
ExecDropSingleTupleTableSlot(slot);
table_endscan(scan);

if (null_segbycols != NULL)
pfree(null_segbycols);
Expand Down
Loading

0 comments on commit 4d62243

Please sign in to comment.