Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip hash bucketing for small partitions #264

Merged
merged 4 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
328 changes: 175 additions & 153 deletions deltacat/compute/compactor_v2/compaction_session.py

Large diffs are not rendered by default.

216 changes: 216 additions & 0 deletions deltacat/compute/compactor_v2/model/merge_file_group.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
# Allow classes to use self-referencing Type hints in Python 3.7.
from __future__ import annotations

import logging
import time
from abc import ABC, abstractmethod
from collections import defaultdict

from deltacat.utils.common import ReadKwargsProvider
from ray.types import ObjectRef

from deltacat.compute.compactor.model.delta_file_envelope import DeltaFileEnvelopeGroups
from deltacat.compute.compactor_v2.utils.delta import read_delta_file_envelopes

from deltacat.compute.compactor_v2.utils.primary_key_index import (
hash_group_index_to_hash_bucket_indices,
)

from deltacat.storage import interface as unimplemented_deltacat_storage

from deltacat.io.object_store import IObjectStore

from deltacat import logs

from deltacat.compute.compactor import DeltaFileEnvelope, DeltaAnnotated

from typing import List, Optional

logger = logs.configure_deltacat_logger(logging.getLogger(__name__))


class MergeFileGroup(dict):
@staticmethod
def of(hb_index: int, dfe_groups: Optional[List[List[DeltaFileEnvelope]]] = None):
"""
Creates a container with delta file envelope groupings and other
additional properties used primarily for the merging step.

Args:
hb_index: This signifies the hash bucket index corresponding to the envelope delta file groups.
dfe_groups: A list of delta file envelope groups.
If not present, the provided hash bucket index is a copy by reference candidate during the merge step.

Returns:
A dict

"""
d = MergeFileGroup()
d["hb_index"] = hb_index
d["dfe_groups"] = dfe_groups
return d

@property
def dfe_groups(self) -> Optional[List[List[DeltaFileEnvelope]]]:
return self["dfe_groups"]

@property
def hb_index(self) -> int:
return self["hb_index"]


class MergeFileGroupsProvider(ABC):
@abstractmethod
def create(self) -> List[MergeFileGroup]:
"""
Creates a list of merge file groups.

Returns: a list of merge file groups.

"""
raise NotImplementedError("Method not implemented")

@property
@abstractmethod
def hash_group_index(self):
raise NotImplementedError("Method not implemented")


class LocalMergeFileGroupsProvider(MergeFileGroupsProvider):
"""
A factory class for producing merge file groups given local delta file envelopes.
"""

LOCAL_HASH_BUCKET_INDEX = 0
LOCAL_HASH_GROUP_INDEX = 0

def __init__(
self,
uniform_deltas: List[DeltaAnnotated],
read_kwargs_provider: Optional[ReadKwargsProvider],
deltacat_storage=unimplemented_deltacat_storage,
deltacat_storage_kwargs: Optional[dict] = None,
):
self._deltas = uniform_deltas
self._read_kwargs_provider = read_kwargs_provider
self._deltacat_storage = deltacat_storage
self._deltacat_storage_kwargs = deltacat_storage_kwargs
self._loaded_deltas = False

def _read_deltas_locally(self):
local_dfe_list = []
input_records_count = 0
uniform_deltas = self._deltas
logger.info(f"Getting {len(uniform_deltas)} DFE Tasks.")
dfe_start = time.monotonic()
for annotated_delta in uniform_deltas:
(
delta_file_envelopes,
total_record_count,
total_size_bytes,
) = read_delta_file_envelopes(
annotated_delta,
self._read_kwargs_provider,
self._deltacat_storage,
self._deltacat_storage_kwargs,
)
if delta_file_envelopes:
local_dfe_list.extend(delta_file_envelopes)
input_records_count += total_record_count
dfe_end = time.monotonic()
logger.info(
f"Retrieved {len(local_dfe_list)} DFE Tasks in {dfe_end - dfe_start}s."
)

self._dfe_groups = [local_dfe_list] if len(local_dfe_list) > 0 else None
self._loaded_deltas = True

def create(self) -> List[MergeFileGroup]:
if not self._loaded_deltas:
self._read_deltas_locally()

if not self._dfe_groups:
return []

# Since hash bucketing is skipped for local merges, we use a fixed index here.
return [
MergeFileGroup.of(
hb_index=LocalMergeFileGroupsProvider.LOCAL_HASH_BUCKET_INDEX,
dfe_groups=self._dfe_groups,
)
]

@property
def hash_group_index(self):
return LocalMergeFileGroupsProvider.LOCAL_HASH_GROUP_INDEX


class RemoteMergeFileGroupsProvider(MergeFileGroupsProvider):
"""
A factory class for producing merge file groups given delta file envelope object refs
and hash bucketing parameters. Delta file envelopes are pulled from the object store
remotely and loaded with in-memory pyarrow tables.
"""

def __init__(
self,
hash_group_index: int,
dfe_groups_refs: List[ObjectRef[DeltaFileEnvelopeGroups]],
hash_bucket_count: int,
num_hash_groups: int,
object_store: IObjectStore,
):
self.hash_bucket_count = hash_bucket_count
self.num_hash_groups = num_hash_groups
self.object_store = object_store
self._hash_group_index = hash_group_index
self._dfe_groups_refs = dfe_groups_refs
self._dfe_groups = []
self._loaded_from_object_store = False

def _load_deltas_from_object_store(self):
delta_file_envelope_groups_list = self.object_store.get_many(
self._dfe_groups_refs
)
hb_index_to_delta_file_envelopes_list = defaultdict(list)
for delta_file_envelope_groups in delta_file_envelope_groups_list:
assert self.hash_bucket_count == len(delta_file_envelope_groups), (
f"The hash bucket count must match the dfe size as {self.hash_bucket_count}"
f" != {len(delta_file_envelope_groups)}"
)

for hb_idx, dfes in enumerate(delta_file_envelope_groups):
if dfes:
hb_index_to_delta_file_envelopes_list[hb_idx].append(dfes)
valid_hb_indices_iterable = hash_group_index_to_hash_bucket_indices(
self.hash_group_index, self.hash_bucket_count, self.num_hash_groups
)

total_dfes_found = 0
dfe_list_groups = []
for hb_idx in valid_hb_indices_iterable:
dfe_list = hb_index_to_delta_file_envelopes_list.get(hb_idx)
if dfe_list:
total_dfes_found += 1
dfe_list_groups.append(
MergeFileGroup.of(hb_index=hb_idx, dfe_groups=dfe_list)
)
else:
dfe_list_groups.append(MergeFileGroup.of(hb_index=hb_idx))

assert total_dfes_found == len(hb_index_to_delta_file_envelopes_list), (
"The total dfe list does not match the input dfes from hash bucket as "
f"{total_dfes_found} != {len(hb_index_to_delta_file_envelopes_list)}"
)
self._dfe_groups = dfe_list_groups
self._loaded_from_object_store = True

def create(self) -> List[MergeFileGroup]:
if not self._loaded_from_object_store:
self._load_deltas_from_object_store()

return self._dfe_groups

@property
def hash_group_index(self):
return self._hash_group_index
32 changes: 8 additions & 24 deletions deltacat/compute/compactor_v2/model/merge_input.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from __future__ import annotations

from ray.types import ObjectRef
from typing import Dict, List, Optional, Any

from deltacat.compute.compactor_v2.model.merge_file_group import (
MergeFileGroupsProvider,
)
from deltacat.utils.metrics import MetricsConfig
from deltacat.utils.common import ReadKwargsProvider
from deltacat.io.object_store import IObjectStore
Expand All @@ -16,19 +19,15 @@
)
from deltacat.types.media import ContentType
from deltacat.compute.compactor.model.round_completion_info import RoundCompletionInfo
from deltacat.compute.compactor.model.delta_file_envelope import DeltaFileEnvelopeGroups


class MergeInput(Dict):
@staticmethod
def of(
dfe_groups_refs: List[ObjectRef[DeltaFileEnvelopeGroups]],
merge_file_groups_provider: MergeFileGroupsProvider,
write_to_partition: Partition,
compacted_file_content_type: ContentType,
primary_keys: List[str],
hash_group_index: int,
num_hash_groups: int,
hash_bucket_count: int,
drop_duplicates: Optional[bool] = DROP_DUPLICATES,
sort_keys: Optional[List[SortKey]] = None,
merge_task_index: Optional[int] = 0,
Expand All @@ -44,13 +43,10 @@ def of(
) -> MergeInput:

result = MergeInput()
result["dfe_groups_refs"] = dfe_groups_refs
result["merge_file_groups_provider"] = merge_file_groups_provider
result["write_to_partition"] = write_to_partition
result["compacted_file_content_type"] = compacted_file_content_type
result["primary_keys"] = primary_keys
result["hash_group_index"] = hash_group_index
result["num_hash_groups"] = num_hash_groups
result["hash_bucket_count"] = hash_bucket_count
result["drop_duplicates"] = drop_duplicates
result["sort_keys"] = sort_keys
result["merge_task_index"] = merge_task_index
Expand All @@ -67,8 +63,8 @@ def of(
return result

@property
def dfe_groups_refs(self) -> List[ObjectRef[DeltaFileEnvelopeGroups]]:
return self["dfe_groups_refs"]
def merge_file_groups_provider(self) -> MergeFileGroupsProvider:
return self["merge_file_groups_provider"]

@property
def write_to_partition(self) -> Partition:
Expand All @@ -82,18 +78,6 @@ def compacted_file_content_type(self) -> ContentType:
def primary_keys(self) -> List[str]:
return self["primary_keys"]

@property
def hash_group_index(self) -> int:
return self["hash_group_index"]

@property
def num_hash_groups(self) -> int:
return self["num_hash_groups"]

@property
def hash_bucket_count(self) -> int:
return self["hash_bucket_count"]

@property
def drop_duplicates(self) -> int:
return self["drop_duplicates"]
Expand Down
1 change: 1 addition & 0 deletions deltacat/compute/compactor_v2/model/merge_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

class MergeResult(NamedTuple):
materialize_results: List[MaterializeResult]
input_record_count: np.int64
deduped_record_count: np.int64
peak_memory_usage_bytes: np.double
telemetry_time_in_seconds: np.double
Expand Down
56 changes: 2 additions & 54 deletions deltacat/compute/compactor_v2/steps/hash_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from typing import List, Optional, Tuple
from deltacat.compute.compactor_v2.model.hash_bucket_input import HashBucketInput
import numpy as np
import pyarrow as pa
import ray
from deltacat import logs
from deltacat.compute.compactor import (
Expand All @@ -14,12 +13,12 @@
)
from deltacat.compute.compactor.model.delta_file_envelope import DeltaFileEnvelopeGroups
from deltacat.compute.compactor_v2.model.hash_bucket_result import HashBucketResult
from deltacat.compute.compactor_v2.utils.delta import read_delta_file_envelopes
from deltacat.compute.compactor_v2.utils.primary_key_index import (
group_hash_bucket_indices,
group_by_pk_hash_bucket,
)
from deltacat.storage import interface as unimplemented_deltacat_storage
from deltacat.types.media import StorageType
from deltacat.utils.ray_utils.runtime import (
get_current_ray_task_id,
get_current_ray_worker_id,
Expand All @@ -39,57 +38,6 @@
logger = logs.configure_deltacat_logger(logging.getLogger(__name__))


def _read_delta_file_envelopes(
annotated_delta: DeltaAnnotated,
read_kwargs_provider: Optional[ReadKwargsProvider],
deltacat_storage=unimplemented_deltacat_storage,
deltacat_storage_kwargs: Optional[dict] = None,
) -> Tuple[Optional[List[DeltaFileEnvelope]], int, int]:

tables = deltacat_storage.download_delta(
annotated_delta,
max_parallelism=1,
file_reader_kwargs_provider=read_kwargs_provider,
storage_type=StorageType.LOCAL,
**deltacat_storage_kwargs,
)
annotations = annotated_delta.annotations
assert (
len(tables) == len(annotations),
f"Unexpected Error: Length of downloaded delta manifest tables "
f"({len(tables)}) doesn't match the length of delta manifest "
f"annotations ({len(annotations)}).",
)
if not tables:
return None, 0, 0

delta_stream_position = annotations[0].annotation_stream_position
delta_type = annotations[0].annotation_delta_type

for annotation in annotations:
assert annotation.annotation_stream_position == delta_stream_position, (
f"Annotation stream position does not match - {annotation.annotation_stream_position} "
f"!= {delta_stream_position}"
)
assert annotation.annotation_delta_type == delta_type, (
f"Annotation delta type does not match - {annotation.annotation_delta_type} "
f"!= {delta_type}"
)

delta_file_envelopes = []
table = pa.concat_tables(tables)
total_record_count = len(table)
total_size_bytes = int(table.nbytes)

delta_file = DeltaFileEnvelope.of(
stream_position=delta_stream_position,
delta_type=delta_type,
table=table,
)
delta_file_envelopes.append(delta_file)
return delta_file_envelopes, total_record_count, total_size_bytes


def _group_file_records_by_pk_hash_bucket(
annotated_delta: DeltaAnnotated,
num_hash_buckets: int,
Expand All @@ -103,7 +51,7 @@ def _group_file_records_by_pk_hash_bucket(
delta_file_envelopes,
total_record_count,
total_size_bytes,
) = _read_delta_file_envelopes(
) = read_delta_file_envelopes(
annotated_delta,
read_kwargs_provider,
deltacat_storage,
Expand Down
Loading
Loading