Skip to content

Commit

Permalink
Allow s3 client kwargs as argument of compact_partition (#155)
Browse files Browse the repository at this point in the history
* Allow s3 client kwargs as argument of compact_partition

* Changing default as empty dict

* add s3_client_kwargs to rcf

* Bunping up version to 0.1.18b8
  • Loading branch information
raghumdani authored Jul 25, 2023
1 parent 24240ba commit fabc387
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 10 deletions.
2 changes: 1 addition & 1 deletion deltacat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

deltacat.logs.configure_deltacat_logger(logging.getLogger(__name__))

__version__ = "0.1.18b4"
__version__ = "0.1.18b8"


__all__ = [
Expand Down
30 changes: 25 additions & 5 deletions deltacat/compute/compactor/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def compact_partition(
read_kwargs_provider: Optional[ReadKwargsProvider] = None,
s3_table_writer_kwargs: Optional[Dict[str, Any]] = None,
object_store: Optional[IObjectStore] = RayPlasmaObjectStore(),
s3_client_kwargs: Optional[Dict[str, Any]] = {},
deltacat_storage=unimplemented_deltacat_storage,
**kwargs,
) -> Optional[str]:
Expand Down Expand Up @@ -155,6 +156,7 @@ def compact_partition(
read_kwargs_provider,
s3_table_writer_kwargs,
object_store,
s3_client_kwargs,
deltacat_storage,
**kwargs,
)
Expand All @@ -174,6 +176,7 @@ def compact_partition(
compaction_artifact_s3_bucket,
new_rcf_partition_locator,
new_rci,
**s3_client_kwargs,
)
logger.info(f"Completed compaction session for: {source_partition_locator}")
return round_completion_file_s3_url
Expand Down Expand Up @@ -201,6 +204,7 @@ def _execute_compaction_round(
read_kwargs_provider: Optional[ReadKwargsProvider],
s3_table_writer_kwargs: Optional[Dict[str, Any]],
object_store: Optional[IObjectStore],
s3_client_kwargs: Optional[Dict[str, Any]],
deltacat_storage=unimplemented_deltacat_storage,
**kwargs,
) -> Tuple[Optional[Partition], Optional[RoundCompletionInfo], Optional[str]]:
Expand Down Expand Up @@ -284,7 +288,7 @@ def _execute_compaction_round(
round_completion_info = None
if not rebase_source_partition_locator:
round_completion_info = rcf.read_round_completion_file(
compaction_artifact_s3_bucket, source_partition_locator
compaction_artifact_s3_bucket, source_partition_locator, **s3_client_kwargs
)
if not round_completion_info:
logger.info(
Expand Down Expand Up @@ -330,7 +334,11 @@ def _execute_compaction_round(
delta_discovery_end - delta_discovery_start
)

s3_utils.upload(compaction_audit.audit_url, str(json.dumps(compaction_audit)))
s3_utils.upload(
compaction_audit.audit_url,
str(json.dumps(compaction_audit)),
**s3_client_kwargs,
)

if not input_deltas:
logger.info("No input deltas found to compact.")
Expand Down Expand Up @@ -424,7 +432,11 @@ def _execute_compaction_round(
hb_end - hb_start,
)

s3_utils.upload(compaction_audit.audit_url, str(json.dumps(compaction_audit)))
s3_utils.upload(
compaction_audit.audit_url,
str(json.dumps(compaction_audit)),
**s3_client_kwargs,
)

all_hash_group_idx_to_obj_id = defaultdict(list)
for hb_result in hb_results:
Expand Down Expand Up @@ -539,7 +551,11 @@ def _execute_compaction_round(
# parallel step 3:
# materialize records to keep by index

s3_utils.upload(compaction_audit.audit_url, str(json.dumps(compaction_audit)))
s3_utils.upload(
compaction_audit.audit_url,
str(json.dumps(compaction_audit)),
**s3_client_kwargs,
)

materialize_start = time.monotonic()

Expand Down Expand Up @@ -641,7 +657,11 @@ def _execute_compaction_round(
mat_results, telemetry_time_hb + telemetry_time_dd + telemetry_time_materialize
)

s3_utils.upload(compaction_audit.audit_url, str(json.dumps(compaction_audit)))
s3_utils.upload(
compaction_audit.audit_url,
str(json.dumps(compaction_audit)),
**s3_client_kwargs,
)

new_round_completion_info = RoundCompletionInfo.of(
last_stream_position_compacted,
Expand Down
15 changes: 11 additions & 4 deletions deltacat/compute/compactor/utils/round_completion_file.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import logging

from typing import Dict, Any
from deltacat import logs
from deltacat.compute.compactor import RoundCompletionInfo
from deltacat.storage import PartitionLocator
Expand All @@ -19,7 +19,9 @@ def get_round_completion_file_s3_url(


def read_round_completion_file(
bucket: str, source_partition_locator: PartitionLocator
bucket: str,
source_partition_locator: PartitionLocator,
**s3_client_kwargs: Optional[Dict[str, Any]],
) -> RoundCompletionInfo:

round_completion_file_url = get_round_completion_file_s3_url(
Expand All @@ -28,7 +30,7 @@ def read_round_completion_file(
)
logger.info(f"reading round completion file from: {round_completion_file_url}")
round_completion_info = None
result = s3_utils.download(round_completion_file_url, False)
result = s3_utils.download(round_completion_file_url, False, **s3_client_kwargs)
if result:
json_str = result["Body"].read().decode("utf-8")
round_completion_info = RoundCompletionInfo(json.loads(json_str))
Expand All @@ -41,6 +43,7 @@ def write_round_completion_file(
source_partition_locator: Optional[PartitionLocator],
round_completion_info: RoundCompletionInfo,
completion_file_s3_url: str = None,
**s3_client_kwargs: Optional[Dict[str, Any]],
) -> str:
if bucket is None and completion_file_s3_url is None:
raise AssertionError("Either bucket or completion_file_s3_url must be passed")
Expand All @@ -52,6 +55,10 @@ def write_round_completion_file(
source_partition_locator,
)
logger.info(f"writing round completion file to: {completion_file_s3_url}")
s3_utils.upload(completion_file_s3_url, str(json.dumps(round_completion_info)))
s3_utils.upload(
completion_file_s3_url,
str(json.dumps(round_completion_info)),
**s3_client_kwargs,
)
logger.info(f"round completion file written to: {completion_file_s3_url}")
return completion_file_s3_url

0 comments on commit fabc387

Please sign in to comment.