Skip to content

Commit

Permalink
Make the bulk decompression function depend on PG type
Browse files Browse the repository at this point in the history
This is a refactoring to enable bulk decompression of array and
dictionary compressed text columns, but not other types. Currently has
no effect.
  • Loading branch information
akuzm committed Dec 15, 2023
1 parent 37ece57 commit 3d8ec1e
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 7 deletions.
2 changes: 1 addition & 1 deletion tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ DecompressionIterator *(*tsl_get_decompression_iterator_init(CompressionAlgorith
}

DecompressAllFunction
tsl_get_decompress_all_function(CompressionAlgorithm algorithm)
tsl_get_decompress_all_function(CompressionAlgorithm algorithm, Oid type)
{
if (algorithm >= _END_COMPRESSION_ALGORITHMS)
elog(ERROR, "invalid compression algorithm %d", algorithm);
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,8 @@ extern void decompress_chunk(Oid in_table, Oid out_table);
extern DecompressionIterator *(*tsl_get_decompression_iterator_init(
CompressionAlgorithm algorithm, bool reverse))(Datum, Oid element_type);

extern DecompressAllFunction tsl_get_decompress_all_function(CompressionAlgorithm algorithm);
extern DecompressAllFunction tsl_get_decompress_all_function(CompressionAlgorithm algorithm,
Oid type);

typedef struct Chunk Chunk;
typedef struct ChunkInsertState ChunkInsertState;
Expand Down
4 changes: 2 additions & 2 deletions tsl/src/compression/decompress_test_impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ FUNCTION_NAME(ALGO, CTYPE)(const uint8 *Data, size_t Size, bool extra_checks)
* For routine fuzzing, we only run bulk decompression to make it faster
* and the coverage space smaller.
*/
DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo);
DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo, PGTYPE);
decompress_all(compressed_data, PGTYPE, CurrentMemoryContext);
return 0;
}
Expand All @@ -53,7 +53,7 @@ FUNCTION_NAME(ALGO, CTYPE)(const uint8 *Data, size_t Size, bool extra_checks)
* the row-by-row is old and stable.
*/
ArrowArray *arrow = NULL;
DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo);
DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo, PGTYPE);
if (decompress_all)
{
arrow = decompress_all(compressed_data, PGTYPE, CurrentMemoryContext);
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/nodes/decompress_chunk/compressed_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state
}

DecompressAllFunction decompress_all =
tsl_get_decompress_all_function(header->compression_algorithm);
tsl_get_decompress_all_function(header->compression_algorithm,
column_description->typid);
Assert(decompress_all != NULL);

MemoryContext context_before_decompression =
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/nodes/decompress_chunk/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,8 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref)
ArrowArray *arrow = NULL;

DecompressAllFunction decompress_all =
tsl_get_decompress_all_function(header->compression_algorithm);
tsl_get_decompress_all_function(header->compression_algorithm,
column_description->typid);
Assert(decompress_all != NULL);

MemoryContext context_before_decompression =
Expand Down
10 changes: 9 additions & 1 deletion tsl/src/nodes/decompress_chunk/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,22 @@ build_decompression_map(PlannerInfo *root, DecompressChunkPath *path, List *scan
lappend_int(path->decompression_map, destination_attno_in_uncompressed_chunk);
path->is_segmentby_column = lappend_int(path->is_segmentby_column, is_segment);

/*
* Determine if we can use bulk decompression for this column.
*/
Oid typoid = get_atttype(path->info->chunk_rte->relid, chunk_attno);
const bool bulk_decompression_possible =
!is_segment && destination_attno_in_uncompressed_chunk > 0 &&
tsl_get_decompress_all_function(compression_get_default_algorithm(typoid)) != NULL;
tsl_get_decompress_all_function(compression_get_default_algorithm(typoid), typoid) !=
NULL;
path->have_bulk_decompression_columns |= bulk_decompression_possible;
path->bulk_decompression_column =
lappend_int(path->bulk_decompression_column, bulk_decompression_possible);

/*
* Save information about decompressed columns in uncompressed chunk
* for planning of vectorized filters.
*/
if (destination_attno_in_uncompressed_chunk > 0)
{
path->uncompressed_chunk_attno_to_compression_info
Expand Down

0 comments on commit 3d8ec1e

Please sign in to comment.