Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] Port CRT S3 translation mapping. #8959

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 67 additions & 3 deletions awscli/s3transfer/crt.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
S3Client,
S3RequestTlsMode,
S3RequestType,
S3ResponseError,
get_recommended_throughput_target_gbps,
)
from botocore import UNSIGNED
Expand Down Expand Up @@ -203,6 +204,9 @@ def __init__(self, crt_s3_client, crt_request_serializer, osutil=None):
self._s3_args_creator = S3ClientArgsCreator(
crt_request_serializer, self._osutil
)
self._crt_exception_translator = (
crt_request_serializer.translate_crt_exception
)
self._future_coordinators = []
self._semaphore = threading.Semaphore(128) # not configurable
# A counter to create unique id's for each transfer submitted.
Expand Down Expand Up @@ -306,7 +310,10 @@ def _release_semaphore(self, **kwargs):

def _submit_transfer(self, request_type, call_args):
on_done_after_calls = [self._release_semaphore]
coordinator = CRTTransferCoordinator(transfer_id=self._id_counter)
coordinator = CRTTransferCoordinator(
transfer_id=self._id_counter,
exception_translator=self._crt_exception_translator,
)
components = {
'meta': CRTTransferMeta(self._id_counter, call_args),
'coordinator': coordinator,
Expand Down Expand Up @@ -417,6 +424,9 @@ def serialize_http_request(self, transfer_type, future):
"""
raise NotImplementedError('serialize_http_request()')

def translate_crt_exception(self, exception):
raise NotImplementedError('translate_crt_exception()')


class BotocoreCRTRequestSerializer(BaseCRTRequestSerializer):
def __init__(self, session, client_kwargs=None):
Expand Down Expand Up @@ -540,6 +550,40 @@ def serialize_http_request(self, transfer_type, future):
crt_request = self._convert_to_crt_http_request(botocore_http_request)
return crt_request

def translate_crt_exception(self, exception):
if isinstance(exception, S3ResponseError):
return self._translate_crt_s3_response_error(exception)
else:
return None

def _translate_crt_s3_response_error(self, s3_response_error):
status_code = s3_response_error.status_code
if status_code < 301:
# Botocore's exception parsing only
# runs on status codes >= 301
return None

headers = {k: v for k, v in s3_response_error.headers}
operation_name = s3_response_error.operation_name
if operation_name is not None:
service_model = self._client.meta.service_model
shape = service_model.operation_model(operation_name).output_shape
else:
shape = None

response_dict = {
'headers': botocore.awsrequest.HeadersDict(headers),
'status_code': status_code,
'body': s3_response_error.body,
}
parsed_response = self._client._response_parser.parse(
response_dict, shape=shape
)

error_code = parsed_response.get("Error", {}).get("Code")
error_class = self._client.exceptions.from_code(error_code)
return error_class(parsed_response, operation_name=operation_name)


class FakeRawResponse(BytesIO):
def stream(self, amt=1024, decode_content=None):
Expand Down Expand Up @@ -572,8 +616,11 @@ def _get_credentials(self):
class CRTTransferCoordinator:
"""A helper class for managing CRTTransferFuture"""

def __init__(self, transfer_id=None, s3_request=None):
def __init__(
self, transfer_id=None, s3_request=None, exception_translator=None
):
self.transfer_id = transfer_id
self._exception_translator = exception_translator
self._s3_request = s3_request
self._lock = threading.Lock()
self._exception = None
Expand Down Expand Up @@ -606,11 +653,28 @@ def result(self, timeout=None):
self._crt_future.result(timeout)
except KeyboardInterrupt:
self.cancel()
self._crt_future.result(timeout)
raise
except Exception as e:
self.handle_exception(e)
finally:
if self._s3_request:
self._s3_request = None
self._crt_future.result(timeout)

def handle_exception(self, exc):
translated_exc = None
if self._exception_translator:
try:
translated_exc = self._exception_translator(exc)
except Exception as e:
# Bail out if we hit an issue translating
# and raise the original error.
logger.debug("Unable to translate exception.", exc_info=e)
pass
if translated_exc is not None:
raise translated_exc from exc
else:
raise exc

def done(self):
if self._crt_future is None:
Expand Down
56 changes: 55 additions & 1 deletion tests/unit/s3transfer/test_crt.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import pytest
from botocore.credentials import Credentials, ReadOnlyCredentials
from botocore.exceptions import NoCredentialsError
from botocore.exceptions import ClientError, NoCredentialsError
from botocore.session import Session

from s3transfer.exceptions import TransferNotDoneError
Expand Down Expand Up @@ -164,6 +164,60 @@ def test_delete_request(self):
self.assertEqual(self.expected_host, crt_request.headers.get("host"))
self.assertIsNone(crt_request.headers.get("Authorization"))

def _create_crt_response_error(
self, status_code, body, operation_name=None
):
return awscrt.s3.S3ResponseError(
code=14343,
name='AWS_ERROR_S3_INVALID_RESPONSE_STATUS',
message='Invalid response status from request',
status_code=status_code,
headers=[
('x-amz-request-id', 'QSJHJJZR2EDYD4GQ'),
(
'x-amz-id-2',
'xDbgdKdvYZTjgpOTzm7yNP2JPrOQl+eaQvUkFdOjdJoWkIC643fgHxdsHpUKvVAfjKf5F6otEYA=',
),
('Content-Type', 'application/xml'),
('Transfer-Encoding', 'chunked'),
('Date', 'Fri, 10 Nov 2023 23:22:47 GMT'),
('Server', 'AmazonS3'),
],
body=body,
operation_name=operation_name,
)

def test_translate_get_object_404(self):
body = (
b'<?xml version="1.0" encoding="UTF-8"?>\n<Error>'
b'<Code>NoSuchKey</Code>'
b'<Message>The specified key does not exist.</Message>'
b'<Key>obviously-no-such-key.txt</Key>'
b'<RequestId>SBJ7ZQY03N1WDW9T</RequestId>'
b'<HostId>SomeHostId</HostId></Error>'
)
crt_exc = self._create_crt_response_error(404, body, 'GetObject')
boto_err = self.request_serializer.translate_crt_exception(crt_exc)
self.assertIsInstance(
boto_err, self.session.create_client('s3').exceptions.NoSuchKey
)

def test_translate_head_object_404(self):
# There's no body in a HEAD response, so we can't map it to a modeled S3 exception.
# But it should still map to a botocore ClientError
body = None
crt_exc = self._create_crt_response_error(
404, body, operation_name='HeadObject'
)
boto_err = self.request_serializer.translate_crt_exception(crt_exc)
self.assertIsInstance(boto_err, ClientError)

def test_translate_unknown_operation_404(self):
body = None
crt_exc = self._create_crt_response_error(404, body)
boto_err = self.request_serializer.translate_crt_exception(crt_exc)
self.assertIsInstance(boto_err, ClientError)


@requires_crt_pytest
class TestBotocoreCRTCredentialsWrapper:
Expand Down
Loading