Skip to content

Commit

Permalink
Simplify empty incremental delta cases during local merge
Browse files Browse the repository at this point in the history
  • Loading branch information
rkenmi committed Feb 26, 2024
1 parent 2e6985f commit 8cac653
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 40 deletions.
3 changes: 0 additions & 3 deletions deltacat/compute/compactor_v2/model/merge_file_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,6 @@ def create(self) -> List[MergeFileGroup]:
if not self._loaded_deltas:
self._read_deltas_locally()

if not self._dfe_groups:
return []

# Since hash bucketing is skipped for local merges, we use a fixed index here.
return [
MergeFileGroup.of(
Expand Down
37 changes: 0 additions & 37 deletions deltacat/compute/compactor_v2/steps/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,40 +315,6 @@ def _compact_tables(
return table, incremental_len, total_deduped_records


def _copy_previous_compacted_table(input: MergeInput) -> List[MaterializeResult]:
materialized_results: List[MaterializeResult] = []
if input.round_completion_info:
old_manifest = input.deltacat_storage.get_delta_manifest(
input.round_completion_info.compacted_delta_locator,
**input.deltacat_storage_kwargs,
)

new_manifest = Manifest.of(entries=old_manifest.entries, uuid=str(uuid4()))
delta = Delta.of(
locator=DeltaLocator.of(input.write_to_partition.locator),
delta_type=DeltaType.UPSERT,
meta=new_manifest.meta,
manifest=new_manifest,
previous_stream_position=input.write_to_partition.stream_position,
properties={},
)
referenced_pyarrow_write_result = PyArrowWriteResult.of(
len(new_manifest.entries),
new_manifest.meta.source_content_length,
new_manifest.meta.content_length,
new_manifest.meta.record_count,
)
materialize_result = MaterializeResult.of(
delta=delta,
task_index=input.merge_task_index,
pyarrow_write_result=referenced_pyarrow_write_result,
referenced_pyarrow_write_result=referenced_pyarrow_write_result,
)

materialized_results.append(materialize_result)
return materialized_results


def _copy_manifests_from_hash_bucketing(
input: MergeInput, hb_index_copy_by_reference_ids: List[int]
) -> List[MaterializeResult]:
Expand Down Expand Up @@ -397,9 +363,6 @@ def _timed_merge(input: MergeInput) -> MergeResult:
merge_utils.materialize(input, merge_file_group.hb_index, [table])
)

if not merge_file_groups:
materialized_results.extend(_copy_previous_compacted_table(input))

if hb_index_copy_by_ref_ids:
materialized_results.extend(
_copy_manifests_from_hash_bucketing(input, hb_index_copy_by_ref_ids)
Expand Down

0 comments on commit 8cac653

Please sign in to comment.