Skip to content

Commit

Permalink
Add debug logs to capture params used for merge memory calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
Kevin Yan committed Feb 3, 2024
1 parent 4791337 commit fe0db78
Showing 1 changed file with 27 additions and 0 deletions.
27 changes: 27 additions & 0 deletions deltacat/compute/compactor_v2/utils/task_options.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import botocore
import logging
from typing import Dict, Optional, List, Tuple
from deltacat import logs
from deltacat.types.media import ContentEncoding, ContentType
from deltacat.types.partial_download import PartialParquetParameters
from deltacat.storage import (
Expand All @@ -17,6 +19,8 @@
PARQUET_TO_PYARROW_INFLATION,
)

logger = logs.configure_deltacat_logger(logging.getLogger(__name__))


def _get_parquet_type_params_if_exist(
entry: ManifestEntry,
Expand Down Expand Up @@ -133,11 +137,13 @@ def hash_bucket_resource_options_provider(
ray_custom_resources: Optional[Dict] = None,
**kwargs,
) -> Dict:
debug_memory_params = {"hash_bucket_task_index": index}
size_bytes = 0.0
num_rows = 0
total_pk_size = 0

if not item.manifest or not item.manifest.entries:
logger.debug("No manifest entries, skipping memory allocation calculation")
return {"CPU": 0.01}

for entry in item.manifest.entries:
Expand Down Expand Up @@ -165,9 +171,17 @@ def hash_bucket_resource_options_provider(
# total size + pk size + pk hash column + hash bucket index column
# Refer to hash_bucket step for more details.
total_memory = size_bytes + total_pk_size + num_rows * 20 + num_rows * 4
debug_memory_params["size_bytes"] = size_bytes
debug_memory_params["num_rows"] = num_rows
debug_memory_params["total_pk_size"] = total_pk_size
debug_memory_params["total_memory"] = total_memory

# Consider buffer
total_memory = total_memory * (1 + TOTAL_MEMORY_BUFFER_PERCENTAGE / 100.0)
debug_memory_params["total_memory_with_buffer"] = total_memory
logger.debug(
f"Params used for calculating hash bucketing memory: {debug_memory_params}"
)

return get_task_options(0.01, total_memory, ray_custom_resources)

Expand All @@ -186,10 +200,13 @@ def merge_resource_options_provider(
deltacat_storage_kwargs: Optional[Dict] = {},
**kwargs,
) -> Dict:
debug_memory_params = {"merge_task_index": index}
hb_group_idx = item[0]

data_size = hash_group_size_bytes.get(hb_group_idx, 0)
num_rows = hash_group_num_rows.get(hb_group_idx, 0)
debug_memory_params["data_size_from_hash_group"] = data_size
debug_memory_params["num_rows_from_hash_group"] = num_rows

# upper bound for pk size of incremental
pk_size_bytes = data_size
Expand All @@ -205,10 +222,13 @@ def merge_resource_options_provider(
round_completion_info.compacted_pyarrow_write_result.pyarrow_bytes
/ round_completion_info.compacted_pyarrow_write_result.file_bytes
)
debug_memory_params["previous_inflation"] = previous_inflation

average_record_size = (
round_completion_info.compacted_pyarrow_write_result.pyarrow_bytes
/ round_completion_info.compacted_pyarrow_write_result.records
)
debug_memory_params["average_record_size"] = average_record_size

iterable = hash_group_index_to_hash_bucket_indices(
hb_group_idx, round_completion_info.hash_bucket_count, num_hash_groups
Expand Down Expand Up @@ -256,7 +276,14 @@ def merge_resource_options_provider(
+ num_rows * 20
+ incremental_index_array_size
)
debug_memory_params["data_size"] = data_size
debug_memory_params["num_rows"] = num_rows
debug_memory_params["pk_size_bytes"] = pk_size_bytes
debug_memory_params["incremental_index_array_size"] = incremental_index_array_size
debug_memory_params["total_memory"] = total_memory

total_memory = total_memory * (1 + TOTAL_MEMORY_BUFFER_PERCENTAGE / 100.0)
debug_memory_params["total_memory_with_buffer"] = total_memory
logger.debug(f"Params used for calculating merge memory: {debug_memory_params}")

return get_task_options(0.01, total_memory, ray_custom_resources)

0 comments on commit fe0db78

Please sign in to comment.