Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat/documentation] Documented openid4vp plus refactoring and some todo #202

Merged
merged 57 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
c49bdfb
Merge pull request #104 from italia/dev
Sep 9, 2023
c6a81ca
Merge pull request #134 from italia/dev
Oct 9, 2023
70f50fa
Merge branch 'main' of https://github.com/italia/eudi-wallet-it-satos…
PascalDR Oct 16, 2023
46ff77f
Merge branch 'dev' of https://github.com/italia/eudi-wallet-it-satosa…
PascalDR Nov 22, 2023
56feaa5
feat: added policy apply on metadata
PascalDR Nov 23, 2023
1ea7bac
test: added intial tests for TrustEvaluationHelper
PascalDR Nov 23, 2023
47fd884
fix: fixed validation issues
PascalDR Nov 23, 2023
bd36786
feat: implemented method add_trust_attestation_metadata
PascalDR Nov 23, 2023
aa0133f
test: added test for add_trust_attestation_metadata
PascalDR Nov 23, 2023
4e44ecf
Merge branch 'dev' of https://github.com/italia/eudi-wallet-it-satosa…
PascalDR Nov 23, 2023
f512f59
Merge branch 'dev' of https://github.com/italia/eudi-wallet-it-satosa…
PascalDR Nov 27, 2023
1d3fa1b
fix: added metadata association by metadata_type field
PascalDR Nov 27, 2023
d90974c
fix: minor fix to test for add_trust_attestation_metadata's data type
PascalDR Nov 27, 2023
850d432
chore: renamed test file
PascalDR Nov 27, 2023
c062b35
chore: Removed comment
PascalDR Nov 27, 2023
bf3843c
fix: fixed x509 verification exception handling
PascalDR Nov 27, 2023
5a74ea0
chore: fix typo
PascalDR Nov 27, 2023
daeb343
fix: merged federation and metadata policy implementation
PascalDR Nov 29, 2023
f94b063
test: adapted tests
PascalDR Nov 29, 2023
67a4d12
feat: added final_metadata property
PascalDR Nov 29, 2023
059e94b
feat: added chain discovery plus refactoring
PascalDR Nov 29, 2023
5782f15
Merge branch 'dev' of https://github.com/italia/eudi-wallet-it-satosa…
PascalDR Nov 29, 2023
b807998
docs: documented file class and functions
PascalDR Dec 4, 2023
62edae6
fix: fixed trust_anchor_entity_conf handling
PascalDR Dec 4, 2023
9243e61
docs: documented trust_chain_builder.py
PascalDR Dec 4, 2023
24a8782
fix: moved implementation of get_http_url in utils.py
PascalDR Dec 5, 2023
29a876e
fix: fixed response handling
PascalDR Dec 5, 2023
1de5916
docs: documented file class and function plus refactoring
PascalDR Dec 5, 2023
7d5a273
docs: documented file __init__.py
PascalDR Dec 6, 2023
baaede8
docs: added docs for http_client.py
PascalDR Dec 6, 2023
d5758bd
Merge branch 'dev' of https://github.com/italia/eudi-wallet-it-satosa…
PascalDR Dec 6, 2023
2d04f1c
docs: documented the content of __init__.py
PascalDR Dec 11, 2023
0f61235
docs: documented contento of __init__.py
PascalDR Dec 11, 2023
1fef461
fix: method name refactoring
PascalDR Dec 11, 2023
f3fae08
fix: added exception
PascalDR Dec 11, 2023
d9a8f3e
fix: refactored method find_jwk
PascalDR Dec 11, 2023
eb0dfda
docs: fixed documentation
PascalDR Dec 11, 2023
46ac8bb
fix: refactoring
PascalDR Dec 11, 2023
3559531
docs: documented content of utils.py
PascalDR Dec 11, 2023
f7a85cc
docs: documented __init__.py content
PascalDR Dec 11, 2023
3d40222
Merge branch 'dev' of https://github.com/italia/eudi-wallet-it-satosa…
PascalDR Dec 11, 2023
f221ac1
fix: Resolved todo (what if the credential is not a JWT?)
PascalDR Dec 13, 2023
052262a
feat: implemented is_jwe_format and is_jws_format
PascalDR Dec 13, 2023
2711ba8
test: amplied test
PascalDR Dec 13, 2023
8a99ab1
fix: refactored code
PascalDR Dec 13, 2023
9b54e93
feat: resolved todo (detect if it is encrypted otherwise)
PascalDR Dec 13, 2023
2c3dc7d
fix: code refactoring
PascalDR Dec 13, 2023
61707f1
docs: documented content of direct_post_response.py
PascalDR Dec 13, 2023
084d1f2
fix: amplied error messages
PascalDR Dec 13, 2023
2ab4002
feat: resolved todo (automatic detection of the credential)
PascalDR Dec 13, 2023
2c2c80e
docs: amplied the documentation
PascalDR Dec 13, 2023
ebbf8a4
fix: refactored code
PascalDR Dec 13, 2023
7543752
fix: added dependency
PascalDR Dec 13, 2023
b8a9b27
docs: documented content of vp_sd_jwt.py
PascalDR Dec 13, 2023
91c5952
fix: refactored code
PascalDR Dec 13, 2023
b89312a
docs: documented content of vp.py
PascalDR Dec 13, 2023
c2cced2
Merge branch 'dev' of https://github.com/italia/eudi-wallet-it-satosa…
PascalDR Dec 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyeudiw/jwk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ def __init__(
:type hash_func: str
:param ec_crv: a string that represents the curve to use with the instance.
:type ec_crv: str

:raises NotImplementedError: the key_type is not implemented
"""
kwargs = {}
self.kid = ""
Expand Down
15 changes: 12 additions & 3 deletions pyeudiw/jwt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Union, Any

import cryptojwt
from cryptojwt.exception import VerificationError
from cryptojwt.jwe.jwe import factory
from cryptojwt.jwe.jwe_ec import JWE_EC
from cryptojwt.jwe.jwe_rsa import JWE_RSA
Expand All @@ -14,6 +13,8 @@
from pyeudiw.jwk.exceptions import KidError
from pyeudiw.jwt.utils import decode_jwt_header

from .exceptions import JWEDecryptionError, JWSVerificationError

DEFAULT_HASH_FUNC = "SHA-256"

DEFAULT_SIG_KTY_MAP = {
Expand Down Expand Up @@ -103,13 +104,15 @@ def decrypt(self, jwe: str) -> dict:
:param jwe: A string representing the jwe.
:type jwe: str

:raises JWEDecryptionError: if jwe field is not in a JWE Format

:returns: A dict that represents the payload of decrypted JWE.
:rtype: dict
"""
try:
jwe_header = decode_jwt_header(jwe)
except (binascii.Error, Exception) as e:
raise VerificationError("The JWT is not valid")
raise JWEDecryptionError(f"Not a valid JWE format for the following reason: {e}")

_alg = jwe_header.get("alg")
_enc = jwe_header.get("enc")
Expand Down Expand Up @@ -186,12 +189,18 @@ def verify(self, jws: str, **kwargs) -> (str | Any | bytes):
:type jws: str
:param kwargs: Other optional fields to generate the JWE.

:raises JWSVerificationError: if jws field is not in a JWS Format

:returns: A string that represents the payload of JWS.
:rtype: str
"""
_key = key_from_jwk_dict(self.jwk.as_dict())
_jwk_dict = self.jwk.as_dict()
_head = decode_jwt_header(jws)

try:
_head = decode_jwt_header(jws)
except (binascii.Error, Exception) as e:
raise JWSVerificationError(f"Not a valid JWS format for the following reason: {e}")

if _head.get("kid"):
if _head["kid"] != _jwk_dict["kid"]: # pragma: no cover
Expand Down
3 changes: 3 additions & 0 deletions pyeudiw/jwt/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ class JWEDecryptionError(Exception):
pass

class JWTInvalidElementPosition(Exception):
pass

class JWSVerificationError(Exception):
pass
37 changes: 37 additions & 0 deletions pyeudiw/jwt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,40 @@ def is_jwt_format(jwt: str) -> bool:

res = re.match(JWT_REGEXP, jwt)
return bool(res)

def is_jwe_format(jwt: str):
"""
Check if a string is in JWE format.

:param jwt: a string that represents the jwt.
:type jwt: str

:returns: True if the string is a JWE, False otherwise.
:rtype: bool
"""

if not is_jwt_format(jwt):
return False

header = decode_jwt_header(jwt)

if header.get("enc", None) == None:
return False

return True

def is_jws_format(jwt: str):
"""
Check if a string is in JWS format.

:param jwt: a string that represents the jwt.
:type jwt: str

:returns: True if the string is a JWS, False otherwise.
:rtype: bool
"""
breakpoint()
if not is_jwt_format(jwt):
return False

return not is_jwe_format(jwt)
117 changes: 86 additions & 31 deletions pyeudiw/openid4vp/direct_post_response.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@

from typing import Dict
from pyeudiw.jwk import JWK
from pyeudiw.jwt import JWEHelper
from pyeudiw.jwt.exceptions import JWEDecryptionError
from pyeudiw.jwt import JWEHelper, JWSHelper
from pyeudiw.jwk.exceptions import KidNotFoundError
from pyeudiw.jwt.utils import decode_jwt_header
from pyeudiw.jwt.utils import decode_jwt_header, is_jwe_format
from pyeudiw.openid4vp.exceptions import (
VPNotFound,
VPInvalidNonce,
NoNonceInVPToken
)
from pyeudiw.openid4vp.schemas.vp_token import VPTokenPayload, VPTokenHeader
from pyeudiw.openid4vp.vp import Vp

from pydantic import ValidationError

class DirectPostResponse:
def __init__(self, jwt: str, jwks_by_kids: dict, nonce: str = ""):

"""
Helper class for generate Direct Post Response.
"""
def __init__(self, jwt: str, jwks_by_kids: Dict[str, dict], nonce: str = ""):
"""
Generate an instance of DirectPostResponse.

:param jwt: a string that represents the jwt.
:type jwt: str
:param jwks_by_kids: a dictionary that contains one or more JWKs with the KID as the key.
:type jwks_by_kids: Dict[str, dict]
:param nonce: a string that represents the nonce.
:type nonce: str
"""
self.headers = decode_jwt_header(jwt)
self.jwks_by_kids = jwks_by_kids
self.jwt = jwt
Expand All @@ -26,35 +37,48 @@ def __init__(self, jwt: str, jwks_by_kids: dict, nonce: str = ""):
self.credentials_by_issuer: dict = {}
self._claims_by_issuer: dict = {}

@property
def payload(self) -> dict:
# TODO: detect if it is encrypted otherwise ...
# here we support only the encrypted jwt
if not self._payload:
self.decrypt()
return self._payload
def _decode_payload(self) -> None:
"""
Internally decrypts the content of the JWT.

def decrypt(self) -> None:
:raises JWSVerificationError: if jws field is not in a JWS Format
:raises JWEDecryptionError: if jwe field is not in a JWE Format
"""
_kid = self.headers.get('kid', None)
if not _kid:
raise KidNotFoundError(
f"The JWT headers {self.headers} doesnt have any KID value"
)
self.jwk = JWK(self.jwks_by_kids[_kid])
jweHelper = JWEHelper(self.jwk)
try:

if is_jwe_format(self.jwt):
jweHelper = JWEHelper(self.jwk)
self._payload = jweHelper.decrypt(self.jwt)
except Exception as e:
_msg = f"Response decryption error: {e}"
raise JWEDecryptionError(_msg)
else:
jwsHelper = JWSHelper(self.jwk)
self._payload = jwsHelper.verify(self.jwt)

def load_nonce(self, nonce: str):
def load_nonce(self, nonce: str) -> None:
"""
Load a nonce string inside the body of response.

:param nonce: a string that represents the nonce.
:type nonce: str
"""
self.nonce = nonce

def validate(self) -> bool:
def _validate_vp(self, vp: dict) -> bool:
"""
Validate a single Verifiable Presentation.

# check nonces
for vp in self.get_presentation_vps():
:param vp: the verifiable presentation to validate.
:type vp: str

:returns: True if is valid, False otherwhise.
:rtype: bool
"""
try:
# check nonce
if self.nonce:
if not vp.payload.get('nonce', None):
raise NoNonceInVPToken()
Expand All @@ -66,23 +90,40 @@ def validate(self) -> bool:
)
VPTokenPayload(**vp.payload)
VPTokenHeader(**vp.headers)
except ValidationError:
return False
return True


def validate(self) -> bool:
"""
Validates all VPs inside JWT's body.

:returns: True if all VP are valid, False otherwhise.
:rtype: bool
"""

for vp in self.get_presentation_vps():
if not self._validate_vp(vp):
return False

return True

@property
def vps(self):
if not self._vps:
self.get_presentation_vps()
return self._vps
def get_presentation_vps(self) -> list[dict]:
"""
Returns the presentation's verifiable presentations

def get_presentation_vps(self):
:returns: the list of vps.
:rtype: list[dict]
"""
if self._vps:
return self._vps

_vps = self.payload.get('vp_token', [])
vps = [_vps] if isinstance(_vps, str) else _vps

if not vps:
raise VPNotFound("vp is null")
raise VPNotFound(f"Vps for response with nonce \"{self.nonce}\" are empty")

for vp in vps:
_vp = Vp(vp)
Expand All @@ -95,3 +136,17 @@ def get_presentation_vps(self):
self.credentials_by_issuer[cred_iss].append(_vp.payload['vp'])

return self._vps

@property
def vps(self) -> list[dict]:
"""Returns the presentation's verifiable presentations"""
if not self._vps:
self.get_presentation_vps()
return self._vps

@property
def payload(self) -> dict:
"""Returns the decoded payload of presentation"""
if not self._payload:
self._decode_payload()
return self._payload
67 changes: 43 additions & 24 deletions pyeudiw/openid4vp/vp.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,62 @@

from pyeudiw.jwt.utils import decode_jwt_payload, decode_jwt_header
from pyeudiw.openid4vp.vp_sd_jwt import VpSdJwt


class Vp(VpSdJwt):
"Class for SD-JWT Format"
def __init__(self, jwt: str) -> None:
"""
Generates a VP istance.

def __init__(self, jwt: str):
# TODO: what if the credential is not a JWT?
self.headers = decode_jwt_header(jwt)
self.jwt = jwt
self.payload = decode_jwt_payload(jwt)
:param jwt: a string that represents the jwt.
:type jwt: str

self.credential_headers: dict = {}
self.credential_payload: dict = {}
:raises InvalidVPToken: if the jwt field's value is not a JWT.
"""
super().__init__(jwt)

self.parse_digital_credential()
self.disclosed_user_attributes: dict = {}

def _detect_vp_type(self):
# TODO - automatic detection of the credential
return 'jwt'

def get_credential_jwks(self):
def _detect_vp_type(self) -> str:
"""
Detects and return the type of verifiable presentation.

:returns: the type of VP.
:rtype: str
"""
return self.headers["typ"].lower()

def get_credential_jwks(self) -> list[dict]:
"""
Returns the credential JWKs.

:returns: the list containing credential's JWKs.
:rtype: list[dict]
"""
if not self.credential_jwks:
return {}
return self.credential_jwks

@property
def credential_issuer(self):
if not self.credential_payload.get('iss', None):
self.parse_digital_credential()
return self.credential_payload.get('iss', None)

def parse_digital_credential(self):
def parse_digital_credential(self) -> None:
"""
Parse the digital credential of VP.

:raises NotImplementedError: if VP Digital credentials type not implemented.
"""
_typ = self._detect_vp_type()
if _typ == 'jwt':
self.credential_headers = decode_jwt_header(self.payload['vp'])
self.credential_payload = decode_jwt_payload(self.payload['vp'])
else:

if _typ != 'jwt':
raise NotImplementedError(
f"VP Digital credentials type not implemented yet: {_typ}"
)

self.credential_headers = decode_jwt_header(self.payload['vp'])
self.credential_payload = decode_jwt_payload(self.payload['vp'])

@property
def credential_issuer(self) -> str:
"""Returns the credential issuer"""
if not self.credential_payload.get('iss', None):
self.parse_digital_credential()
return self.credential_payload.get('iss', None)
Loading
Loading