Skip to content

Commit

Permalink
Allow s3_client_kwargs to be passed into repartition (#158)
Browse files Browse the repository at this point in the history
* Allow s3_client_kwargs to be passed into repartition

* Update compaction session s3_client_kwargs to default to None
  • Loading branch information
rkenmi authored Jul 26, 2023
1 parent 2f75297 commit 6471727
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 2 deletions.
2 changes: 1 addition & 1 deletion deltacat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

deltacat.logs.configure_deltacat_logger(logging.getLogger(__name__))

__version__ = "0.1.18b9"
__version__ = "0.1.18b10"


__all__ = [
Expand Down
5 changes: 4 additions & 1 deletion deltacat/compute/compactor/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def compact_partition(
read_kwargs_provider: Optional[ReadKwargsProvider] = None,
s3_table_writer_kwargs: Optional[Dict[str, Any]] = None,
object_store: Optional[IObjectStore] = RayPlasmaObjectStore(),
s3_client_kwargs: Optional[Dict[str, Any]] = {},
s3_client_kwargs: Optional[Dict[str, Any]] = None,
deltacat_storage=unimplemented_deltacat_storage,
**kwargs,
) -> Optional[str]:
Expand Down Expand Up @@ -284,6 +284,9 @@ def _execute_compaction_round(
max_parallelism = int(cluster_cpus)
logger.info(f"Max parallelism: {max_parallelism}")

if s3_client_kwargs is None:
s3_client_kwargs = {}

# read the results from any previously completed compaction round
round_completion_info = None
if not rebase_source_partition_locator:
Expand Down
5 changes: 5 additions & 0 deletions deltacat/compute/compactor/repartition_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def repartition(
pg_config: Optional[PlacementGroupConfig] = None,
list_deltas_kwargs: Optional[Dict[str, Any]] = None,
read_kwargs_provider: Optional[ReadKwargsProvider] = None,
s3_client_kwargs: Optional[Dict[str, Any]] = None,
deltacat_storage=unimplemented_deltacat_storage,
**kwargs,
) -> Optional[str]:
Expand Down Expand Up @@ -166,9 +167,13 @@ def repartition(
bit_width_of_sort_keys,
None,
)
if s3_client_kwargs is None:
s3_client_kwargs = {}

return rcf.write_round_completion_file(
None,
None,
repartition_completion_info,
repartition_completion_file_s3_url,
**s3_client_kwargs,
)

0 comments on commit 6471727

Please sign in to comment.