Skip to content

Commit

Permalink
Provide more detailed API error exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
andreaso committed Aug 6, 2023
1 parent fcb0a3c commit 8736afc
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 10 deletions.
14 changes: 11 additions & 3 deletions hv4gha/gh.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@ class TokenResponse(TypedDict, total=False):


class GitHubAPIError(Exception):
"""Error response from the GitHub API"""
"""Any error response from the GitHub API"""


class InstallationLookupError(GitHubAPIError):
"""Failure to lookup the GitHub App installation ID"""


class TokenIssueError(GitHubAPIError):
"""Failure to issue GitHub Access Token"""


class NotInstalledError(Exception):
Expand Down Expand Up @@ -65,7 +73,7 @@ def __find_installation(self) -> str:
error_message = http_error.response.json()["message"]
except Exception: # pylint: disable=broad-exception-caught
error_message = "<Failed to parse GitHub API error response>"
raise GitHubAPIError(error_message) from http_error
raise InstallationLookupError(error_message) from http_error

for installation in response.json():
if installation["account"]["login"].lower() == self.account.lower():
Expand Down Expand Up @@ -122,7 +130,7 @@ def issue_token(
error_message = http_error.response.json()["message"]
except Exception: # pylint: disable=broad-exception-caught
error_message = "<Failed to parse GitHub API error response>"
raise GitHubAPIError(error_message) from http_error
raise TokenIssueError(error_message) from http_error

expiry = datetime.strptime(
response.json()["expires_at"], "%Y-%m-%dT%H:%M:%SZ"
Expand Down
35 changes: 28 additions & 7 deletions hv4gha/vault.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,23 @@


class VaultAPIError(Exception):
"""Error response from the Vault API"""
"""Any error response from the Vault API"""


class AppKeyImportError(VaultAPIError):
"""Failure to upload/import the wrapped GitHub App key into Vault"""


class JWTSigningError(VaultAPIError):
"""Failure to have Vault sign a GitHub App JWT token"""


class TokenRevokeError(VaultAPIError):
"""Failure to self-revoke the Vault token"""


class WrappingKeyDownloadError(VaultAPIError):
"""Failure to download the Vault Transit wrapping key"""


class VaultTransit:
Expand Down Expand Up @@ -63,7 +79,7 @@ def __download_wrapping_key(self) -> rsa.RSAPublicKey:
error_message = "\n".join(http_error.response.json()["errors"])
except Exception: # pylint: disable=broad-exception-caught
error_message = "<Failed to parse Vault API error response>"
raise VaultAPIError(error_message) from http_error
raise WrappingKeyDownloadError(error_message) from http_error

wrapping_pem_key = response.json()["data"]["public_key"].encode()
wrapping_key = serialization.load_pem_public_key(wrapping_pem_key)
Expand All @@ -90,7 +106,10 @@ def __wrap_key(self, der_app_key: bytes, wrapping_key: rsa.RSAPublicKey) -> str:
return wrapped_b64

def __api_write(
self, api_path: str, payload: None | dict[str, Any] = None
self,
api_path: str,
payload: None | dict[str, Any] = None,
vault_exception: type[Exception] = VaultAPIError,
) -> requests.models.Response:
update_url = self.vault_addr + api_path

Expand All @@ -111,7 +130,7 @@ def __api_write(
error_message = "\n".join(http_error.response.json()["errors"])
except Exception: # pylint: disable=broad-exception-caught
error_message = "<Failed to parse Vault API error response>"
raise VaultAPIError(error_message) from http_error
raise vault_exception(error_message) from http_error

return response

Expand All @@ -135,7 +154,7 @@ def import_key(self, key_name: str, pem_app_key: bytes) -> None:
"allow_plaintext_backup": False,
}

self.__api_write(api_path, payload)
self.__api_write(api_path, payload, AppKeyImportError)

def __prepare_jwt(self, app_id: str) -> str:
now = int(datetime.now().strftime("%s"))
Expand Down Expand Up @@ -178,7 +197,9 @@ def sign_jwt(self, key_name: str, app_id: str) -> str:
"signature_algorithm": "pkcs1v15",
}

response: requests.models.Response = self.__api_write(api_path, payload)
response: requests.models.Response = self.__api_write(
api_path, payload, JWTSigningError
)

signature: str = response.json()["data"]["signature"].removeprefix("vault:v1:")
signature = self.__b64str(base64.b64decode(signature), urlsafe=True)
Expand All @@ -190,4 +211,4 @@ def revoke_token(self) -> None:
"""Vault Token self-revoke"""

api_path = "/v1/auth/token/revoke-self"
self.__api_write(api_path)
self.__api_write(api_path, vault_exception=TokenRevokeError)

0 comments on commit 8736afc

Please sign in to comment.