diff --git a/deltacat/compute/compactor_v2/model/merge_file_group.py b/deltacat/compute/compactor_v2/model/merge_file_group.py index 4a7b7684..5f8e65b2 100644 --- a/deltacat/compute/compactor_v2/model/merge_file_group.py +++ b/deltacat/compute/compactor_v2/model/merge_file_group.py @@ -129,9 +129,6 @@ def create(self) -> List[MergeFileGroup]: if not self._loaded_deltas: self._read_deltas_locally() - if not self._dfe_groups: - return [] - # Since hash bucketing is skipped for local merges, we use a fixed index here. return [ MergeFileGroup.of( diff --git a/deltacat/compute/compactor_v2/steps/merge.py b/deltacat/compute/compactor_v2/steps/merge.py index bff4e133..ca0a2679 100644 --- a/deltacat/compute/compactor_v2/steps/merge.py +++ b/deltacat/compute/compactor_v2/steps/merge.py @@ -315,40 +315,6 @@ def _compact_tables( return table, incremental_len, total_deduped_records -def _copy_previous_compacted_table(input: MergeInput) -> List[MaterializeResult]: - materialized_results: List[MaterializeResult] = [] - if input.round_completion_info: - old_manifest = input.deltacat_storage.get_delta_manifest( - input.round_completion_info.compacted_delta_locator, - **input.deltacat_storage_kwargs, - ) - - new_manifest = Manifest.of(entries=old_manifest.entries, uuid=str(uuid4())) - delta = Delta.of( - locator=DeltaLocator.of(input.write_to_partition.locator), - delta_type=DeltaType.UPSERT, - meta=new_manifest.meta, - manifest=new_manifest, - previous_stream_position=input.write_to_partition.stream_position, - properties={}, - ) - referenced_pyarrow_write_result = PyArrowWriteResult.of( - len(new_manifest.entries), - new_manifest.meta.source_content_length, - new_manifest.meta.content_length, - new_manifest.meta.record_count, - ) - materialize_result = MaterializeResult.of( - delta=delta, - task_index=input.merge_task_index, - pyarrow_write_result=referenced_pyarrow_write_result, - referenced_pyarrow_write_result=referenced_pyarrow_write_result, - ) - - materialized_results.append(materialize_result) - return materialized_results - - def _copy_manifests_from_hash_bucketing( input: MergeInput, hb_index_copy_by_reference_ids: List[int] ) -> List[MaterializeResult]: @@ -397,9 +363,6 @@ def _timed_merge(input: MergeInput) -> MergeResult: merge_utils.materialize(input, merge_file_group.hb_index, [table]) ) - if not merge_file_groups: - materialized_results.extend(_copy_previous_compacted_table(input)) - if hb_index_copy_by_ref_ids: materialized_results.extend( _copy_manifests_from_hash_bucketing(input, hb_index_copy_by_ref_ids)