Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add low-level debugging #256

Merged
merged 1 commit into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions deltacat/aws/s3u.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,12 @@ def upload_table(
# s3fs may swallow S3 errors - we were probably throttled
raise RetryableError(f"Retry table upload to: {s3_url}") from e
raise NonRetryableError(f"Failed table upload to: {s3_url}") from e
except BaseException as e:
logger.warn(
f"Upload has failed for {s3_url} and content_type={content_type}. Error: {e}",
exc_info=True,
)
raise e
return manifest_entries


Expand Down
4 changes: 2 additions & 2 deletions deltacat/compute/compactor/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
)
from deltacat.compute.compactor.model.compactor_version import CompactorVersion
from deltacat.compute.compactor.utils.sort_key import validate_sort_keys
from deltacat.utils.resources import get_current_node_peak_memory_usage_in_bytes
from deltacat.utils.resources import get_current_process_peak_memory_usage_in_bytes


if importlib.util.find_spec("memray"):
Expand Down Expand Up @@ -679,7 +679,7 @@ def _execute_compaction_round(
[m.pyarrow_write_result for m in mat_results]
)

session_peak_memory = get_current_node_peak_memory_usage_in_bytes()
session_peak_memory = get_current_process_peak_memory_usage_in_bytes()
compaction_audit.set_peak_memory_used_bytes_by_compaction_session_process(
session_peak_memory
)
Expand Down
4 changes: 2 additions & 2 deletions deltacat/compute/compactor/steps/dedupe.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from deltacat.utils.performance import timed_invocation
from deltacat.utils.metrics import emit_timer_metrics, MetricsConfig
from deltacat.io.object_store import IObjectStore
from deltacat.utils.resources import get_current_node_peak_memory_usage_in_bytes
from deltacat.utils.resources import get_current_process_peak_memory_usage_in_bytes

if importlib.util.find_spec("memray"):
import memray
Expand Down Expand Up @@ -228,7 +228,7 @@ def _timed_dedupe(
f"{len(mat_bucket_to_dd_idx_obj_id)}"
)

peak_memory_usage_bytes = get_current_node_peak_memory_usage_in_bytes()
peak_memory_usage_bytes = get_current_process_peak_memory_usage_in_bytes()
return DedupeResult(
mat_bucket_to_dd_idx_obj_id,
np.int64(total_deduped_records),
Expand Down
4 changes: 2 additions & 2 deletions deltacat/compute/compactor/steps/hash_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from deltacat.utils.performance import timed_invocation
from deltacat.utils.metrics import emit_timer_metrics, MetricsConfig
from deltacat.io.object_store import IObjectStore
from deltacat.utils.resources import get_current_node_peak_memory_usage_in_bytes
from deltacat.utils.resources import get_current_process_peak_memory_usage_in_bytes

if importlib.util.find_spec("memray"):
import memray
Expand Down Expand Up @@ -228,7 +228,7 @@ def _timed_hash_bucket(
delta_file_envelope_groups, num_buckets, num_groups, object_store
)

peak_memory_usage_bytes = get_current_node_peak_memory_usage_in_bytes()
peak_memory_usage_bytes = get_current_process_peak_memory_usage_in_bytes()
return HashBucketResult(
hash_bucket_group_to_obj_id,
np.int64(total_record_count),
Expand Down
4 changes: 2 additions & 2 deletions deltacat/compute/compactor/steps/materialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
get_current_ray_worker_id,
)
from deltacat.utils.metrics import emit_timer_metrics, MetricsConfig
from deltacat.utils.resources import get_current_node_peak_memory_usage_in_bytes
from deltacat.utils.resources import get_current_process_peak_memory_usage_in_bytes

if importlib.util.find_spec("memray"):
import memray
Expand Down Expand Up @@ -314,7 +314,7 @@ def _materialize(compacted_tables: List[pa.Table]) -> MaterializeResult:
emit_metrics_time = latency
logger.info(f"Materialize task ended in {end - start}s")

peak_memory_usage_bytes = get_current_node_peak_memory_usage_in_bytes()
peak_memory_usage_bytes = get_current_process_peak_memory_usage_in_bytes()

# Merge all new deltas into one for this materialize bucket index
merged_materialize_result = MaterializeResult.of(
Expand Down
4 changes: 2 additions & 2 deletions deltacat/compute/compactor_v2/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
CompactionSessionAuditInfo,
)
from deltacat.utils.resources import (
get_current_node_peak_memory_usage_in_bytes,
get_current_process_peak_memory_usage_in_bytes,
)
from deltacat.compute.compactor_v2.utils.task_options import (
hash_bucket_resource_options_provider,
Expand Down Expand Up @@ -467,7 +467,7 @@ def merge_input_provider(index, item):
[m.pyarrow_write_result for m in mat_results]
)

session_peak_memory = get_current_node_peak_memory_usage_in_bytes()
session_peak_memory = get_current_process_peak_memory_usage_in_bytes()
compaction_audit.set_peak_memory_used_bytes_by_compaction_session_process(
session_peak_memory
)
Expand Down
65 changes: 41 additions & 24 deletions deltacat/compute/compactor_v2/steps/hash_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@
from deltacat.utils.common import ReadKwargsProvider
from deltacat.utils.performance import timed_invocation
from deltacat.utils.metrics import emit_timer_metrics
from deltacat.utils.resources import get_current_node_peak_memory_usage_in_bytes
from deltacat.utils.resources import (
get_current_process_peak_memory_usage_in_bytes,
ProcessUtilizationOverTimeRange,
)
from deltacat.constants import BYTES_PER_GIBIBYTE

if importlib.util.find_spec("memray"):
import memray
Expand Down Expand Up @@ -166,7 +170,10 @@ def _timed_hash_bucket(input: HashBucketInput):
object_store=input.object_store,
)

peak_memory_usage_bytes = get_current_node_peak_memory_usage_in_bytes()
peak_memory_usage_bytes = get_current_process_peak_memory_usage_in_bytes()
logger.info(
f"Peak memory usage in bytes after hash bucketing: {peak_memory_usage_bytes}"
)
return HashBucketResult(
hash_bucket_group_to_obj_id_tuple,
np.int64(total_size_bytes),
Expand All @@ -179,28 +186,38 @@ def _timed_hash_bucket(input: HashBucketInput):

@ray.remote
def hash_bucket(input: HashBucketInput) -> HashBucketResult:
with ProcessUtilizationOverTimeRange() as process_util:
logger.info(f"Starting hash bucket task...")

logger.info(f"Starting hash bucket task...")
hash_bucket_result, duration = timed_invocation(
func=_timed_hash_bucket, input=input
)
# Log node peak memory utilization every 10 seconds
def log_peak_memory():
logger.debug(
f"Process peak memory utilization so far: {process_util.max_memory} bytes "
f"({process_util.max_memory/BYTES_PER_GIBIBYTE} GB)"
)

process_util.schedule_callback(log_peak_memory, 10)

emit_metrics_time = 0.0
if input.metrics_config:
emit_result, latency = timed_invocation(
func=emit_timer_metrics,
metrics_name="hash_bucket",
value=duration,
metrics_config=input.metrics_config,
hash_bucket_result, duration = timed_invocation(
func=_timed_hash_bucket, input=input
)

emit_metrics_time = 0.0
if input.metrics_config:
emit_result, latency = timed_invocation(
func=emit_timer_metrics,
metrics_name="hash_bucket",
value=duration,
metrics_config=input.metrics_config,
)
emit_metrics_time = latency

logger.info(f"Finished hash bucket task...")
return HashBucketResult(
hash_bucket_result[0],
hash_bucket_result[1],
hash_bucket_result[2],
hash_bucket_result[3],
np.double(emit_metrics_time),
hash_bucket_result[5],
)
emit_metrics_time = latency

logger.info(f"Finished hash bucket task...")
return HashBucketResult(
hash_bucket_result[0],
hash_bucket_result[1],
hash_bucket_result[2],
hash_bucket_result[3],
np.double(emit_metrics_time),
hash_bucket_result[5],
)
59 changes: 38 additions & 21 deletions deltacat/compute/compactor_v2/steps/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@

from deltacat.utils.performance import timed_invocation
from deltacat.utils.metrics import emit_timer_metrics
from deltacat.utils.resources import get_current_node_peak_memory_usage_in_bytes
from deltacat.utils.resources import (
get_current_process_peak_memory_usage_in_bytes,
ProcessUtilizationOverTimeRange,
)
from deltacat.compute.compactor_v2.utils.primary_key_index import (
generate_pk_hash_column,
hash_group_index_to_hash_bucket_indices,
Expand All @@ -44,6 +47,7 @@
interface as unimplemented_deltacat_storage,
)
from deltacat.compute.compactor_v2.utils.dedupe import drop_duplicates
from deltacat.constants import BYTES_PER_GIBIBYTE


if importlib.util.find_spec("memray"):
Expand Down Expand Up @@ -436,7 +440,10 @@ def _materialize(
f"{total_dfes_found} != {len(hb_index_to_delta_file_envelopes_list)}"
)

peak_memory_usage_bytes = get_current_node_peak_memory_usage_in_bytes()
peak_memory_usage_bytes = get_current_process_peak_memory_usage_in_bytes()
logger.info(
f"Peak memory usage in bytes after merge: {peak_memory_usage_bytes}"
)

return MergeResult(
materialized_results,
Expand All @@ -449,25 +456,35 @@ def _materialize(

@ray.remote
def merge(input: MergeInput) -> MergeResult:
with ProcessUtilizationOverTimeRange() as process_util:
logger.info(f"Starting merge task...")

# Log node peak memory utilization every 10 seconds
def log_peak_memory():
logger.debug(
f"Process peak memory utilization so far: {process_util.max_memory} bytes "
f"({process_util.max_memory/BYTES_PER_GIBIBYTE} GB)"
)

process_util.schedule_callback(log_peak_memory, 10)

logger.info(f"Starting merge task...")
merge_result, duration = timed_invocation(func=_timed_merge, input=input)
merge_result, duration = timed_invocation(func=_timed_merge, input=input)

emit_metrics_time = 0.0
if input.metrics_config:
emit_result, latency = timed_invocation(
func=emit_timer_metrics,
metrics_name="merge",
value=duration,
metrics_config=input.metrics_config,
emit_metrics_time = 0.0
if input.metrics_config:
emit_result, latency = timed_invocation(
func=emit_timer_metrics,
metrics_name="merge",
value=duration,
metrics_config=input.metrics_config,
)
emit_metrics_time = latency

logger.info(f"Finished merge task...")
return MergeResult(
merge_result[0],
merge_result[1],
merge_result[2],
np.double(emit_metrics_time),
merge_result[4],
)
emit_metrics_time = latency

logger.info(f"Finished merge task...")
return MergeResult(
merge_result[0],
merge_result[1],
merge_result[2],
np.double(emit_metrics_time),
merge_result[4],
)
21 changes: 21 additions & 0 deletions deltacat/tests/utils/test_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,24 @@ def test_sanity(self, ray_mock):
self.assertIsNotNone(cu.total_memory_gb_seconds)
self.assertIsNotNone(cu.used_memory_gb_seconds)
self.assertIsNotNone(cu.max_cpu)


class TestProcessUtilizationOverTimeRange(unittest.TestCase):
def test_sanity(self):
from deltacat.utils.resources import ProcessUtilizationOverTimeRange

with ProcessUtilizationOverTimeRange() as nu:
time.sleep(3)
self.assertIsNotNone(nu.max_memory)

def test_callback(self):
from deltacat.utils.resources import ProcessUtilizationOverTimeRange

with ProcessUtilizationOverTimeRange() as nu:

def test_callback():
nu.test_field_set = True

nu.schedule_callback(test_callback, 1)
time.sleep(3)
self.assertTrue(nu.test_field_set)
2 changes: 2 additions & 0 deletions deltacat/utils/daft.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ def daft_s3_file_to_table(
)
)

logger.debug(f"Preparing to read S3 object from {s3_url} into daft table")

pa_table, latency = timed_invocation(
read_parquet_into_pyarrow,
path=s3_url,
Expand Down
60 changes: 58 additions & 2 deletions deltacat/utils/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def __init__(self) -> None:
self.total_memory_gb_seconds = 0.0
self.used_memory_gb_seconds = 0.0
self.max_cpu = 0.0
self.max_memory = 0.0

def __enter__(self) -> Any:
schedule.every().second.do(self._update_resources)
Expand Down Expand Up @@ -131,6 +132,11 @@ def _update_resources(self):
+ float(str(cluster_resources["memory"])) / BYTES_PER_GIBIBYTE
)

self.max_memory = max(
self.max_memory,
float(str(cluster_resources["memory"] - available_resources["memory"])),
)

def _run_schedule(self, interval: Optional[float] = 1.0):
cease_continuous_run = threading.Event()

Expand All @@ -146,9 +152,9 @@ def run(cls):
return cease_continuous_run


def get_current_node_peak_memory_usage_in_bytes():
def get_current_process_peak_memory_usage_in_bytes():
"""
Returns the peak memory usage of the node in bytes. This method works across
Returns the peak memory usage of the process in bytes. This method works across
Windows, Darwin and Linux platforms.
"""
current_platform = platform.system()
Expand All @@ -172,3 +178,53 @@ def get_size_of_object_in_bytes(obj: object) -> float:
if isinstance(obj, (list, tuple, set, frozenset)):
return size + sum(map(get_size_of_object_in_bytes, obj))
return size


class ProcessUtilizationOverTimeRange(AbstractContextManager):
"""
This class can be used to compute the process utilization metrics
which requires us to compute it over time as memory utilization changes.
"""

def __init__(self) -> None:
self.max_memory = 0.0

def __enter__(self) -> Any:
schedule.every().second.do(self._update_resources)
self.stop_run_schedules = self._run_schedule()
return super().__enter__()

def __exit__(
self,
__exc_type: type[BaseException] | None,
__exc_value: BaseException | None,
__traceback: TracebackType | None,
) -> bool | None:
if __exc_value:
logger.error(
f"Error ocurred while calculating process resources: {__exc_value}"
)
self.stop_run_schedules.set()
return super().__exit__(__exc_type, __exc_value, __traceback)

def schedule_callback(self, callback, callback_frequency_in_seconds) -> None:
schedule.every(callback_frequency_in_seconds).seconds.do(callback)

# It is not truely parallel(due to GIL Ref: https://wiki.python.org/moin/GlobalInterpreterLock)
# even if we are using threading library. However, it averages out and gives a very good approximation.
def _update_resources(self):
self.max_memory = get_current_process_peak_memory_usage_in_bytes()

def _run_schedule(self, interval: Optional[float] = 1.0):
cease_continuous_run = threading.Event()

class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
while not cease_continuous_run.is_set():
schedule.run_pending()
time.sleep(float(str(interval)))

continuous_thread = ScheduleThread()
continuous_thread.start()
return cease_continuous_run
Loading