Skip to content

Commit

Permalink
[Feat/documentation] Documented openid4vp plus refactoring and some t…
Browse files Browse the repository at this point in the history
…odo (#202)

* feat: added policy apply on metadata

* test: added intial tests for TrustEvaluationHelper

* fix: fixed validation issues

* feat: implemented method add_trust_attestation_metadata

* test: added test for add_trust_attestation_metadata

* fix: added metadata association by metadata_type field

* fix: minor fix to test for add_trust_attestation_metadata's data type

* chore: renamed test file

* chore: Removed comment

* fix: fixed x509 verification exception handling

* chore: fix typo

* fix: merged federation and metadata policy implementation

* test: adapted tests

* feat: added final_metadata property

* feat: added chain discovery plus refactoring

* docs: documented file class and functions

* fix: fixed trust_anchor_entity_conf handling

* docs: documented trust_chain_builder.py

* fix: moved implementation of get_http_url in utils.py

* fix: fixed response handling

* docs: documented file class and function plus refactoring

* docs: documented file __init__.py

* docs: added docs for http_client.py

* docs: documented the content of __init__.py

* docs: documented contento of __init__.py

* fix: method name refactoring

* fix: added exception

* fix: refactored method find_jwk

* docs: fixed documentation

* fix: refactoring

* docs: documented content of utils.py

* docs: documented __init__.py content

* fix: Resolved todo (what if the credential is not a JWT?)

* feat: implemented is_jwe_format and is_jws_format

* test: amplied test

* fix: refactored code

* feat: resolved todo (detect if it is encrypted otherwise)

* fix: code refactoring

* docs: documented content of direct_post_response.py

* fix: amplied error messages

* feat: resolved todo (automatic detection of the credential)

* docs: amplied the documentation

* fix: refactored code

* fix: added dependency

* docs: documented content of vp_sd_jwt.py

* fix: refactored code

* docs: documented content of vp.py

---------

Co-authored-by: Giuseppe De Marco <[email protected]>
  • Loading branch information
PascalDR and Giuseppe De Marco authored Dec 13, 2023
1 parent 3d2ced7 commit f305d22
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 62 deletions.
2 changes: 2 additions & 0 deletions pyeudiw/jwk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ def __init__(
:type hash_func: str
:param ec_crv: a string that represents the curve to use with the instance.
:type ec_crv: str
:raises NotImplementedError: the key_type is not implemented
"""
kwargs = {}
self.kid = ""
Expand Down
15 changes: 12 additions & 3 deletions pyeudiw/jwt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Union, Any

import cryptojwt
from cryptojwt.exception import VerificationError
from cryptojwt.jwe.jwe import factory
from cryptojwt.jwe.jwe_ec import JWE_EC
from cryptojwt.jwe.jwe_rsa import JWE_RSA
Expand All @@ -14,6 +13,8 @@
from pyeudiw.jwk.exceptions import KidError
from pyeudiw.jwt.utils import decode_jwt_header

from .exceptions import JWEDecryptionError, JWSVerificationError

DEFAULT_HASH_FUNC = "SHA-256"

DEFAULT_SIG_KTY_MAP = {
Expand Down Expand Up @@ -103,13 +104,15 @@ def decrypt(self, jwe: str) -> dict:
:param jwe: A string representing the jwe.
:type jwe: str
:raises JWEDecryptionError: if jwe field is not in a JWE Format
:returns: A dict that represents the payload of decrypted JWE.
:rtype: dict
"""
try:
jwe_header = decode_jwt_header(jwe)
except (binascii.Error, Exception) as e:
raise VerificationError("The JWT is not valid")
raise JWEDecryptionError(f"Not a valid JWE format for the following reason: {e}")

_alg = jwe_header.get("alg")
_enc = jwe_header.get("enc")
Expand Down Expand Up @@ -186,12 +189,18 @@ def verify(self, jws: str, **kwargs) -> (str | Any | bytes):
:type jws: str
:param kwargs: Other optional fields to generate the JWE.
:raises JWSVerificationError: if jws field is not in a JWS Format
:returns: A string that represents the payload of JWS.
:rtype: str
"""
_key = key_from_jwk_dict(self.jwk.as_dict())
_jwk_dict = self.jwk.as_dict()
_head = decode_jwt_header(jws)

try:
_head = decode_jwt_header(jws)
except (binascii.Error, Exception) as e:
raise JWSVerificationError(f"Not a valid JWS format for the following reason: {e}")

if _head.get("kid"):
if _head["kid"] != _jwk_dict["kid"]: # pragma: no cover
Expand Down
3 changes: 3 additions & 0 deletions pyeudiw/jwt/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ class JWEDecryptionError(Exception):
pass

class JWTInvalidElementPosition(Exception):
pass

class JWSVerificationError(Exception):
pass
37 changes: 37 additions & 0 deletions pyeudiw/jwt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,40 @@ def is_jwt_format(jwt: str) -> bool:

res = re.match(JWT_REGEXP, jwt)
return bool(res)

def is_jwe_format(jwt: str):
"""
Check if a string is in JWE format.
:param jwt: a string that represents the jwt.
:type jwt: str
:returns: True if the string is a JWE, False otherwise.
:rtype: bool
"""

if not is_jwt_format(jwt):
return False

header = decode_jwt_header(jwt)

if header.get("enc", None) == None:
return False

return True

def is_jws_format(jwt: str):
"""
Check if a string is in JWS format.
:param jwt: a string that represents the jwt.
:type jwt: str
:returns: True if the string is a JWS, False otherwise.
:rtype: bool
"""
breakpoint()
if not is_jwt_format(jwt):
return False

return not is_jwe_format(jwt)
117 changes: 86 additions & 31 deletions pyeudiw/openid4vp/direct_post_response.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@

from typing import Dict
from pyeudiw.jwk import JWK
from pyeudiw.jwt import JWEHelper
from pyeudiw.jwt.exceptions import JWEDecryptionError
from pyeudiw.jwt import JWEHelper, JWSHelper
from pyeudiw.jwk.exceptions import KidNotFoundError
from pyeudiw.jwt.utils import decode_jwt_header
from pyeudiw.jwt.utils import decode_jwt_header, is_jwe_format
from pyeudiw.openid4vp.exceptions import (
VPNotFound,
VPInvalidNonce,
NoNonceInVPToken
)
from pyeudiw.openid4vp.schemas.vp_token import VPTokenPayload, VPTokenHeader
from pyeudiw.openid4vp.vp import Vp

from pydantic import ValidationError

class DirectPostResponse:
def __init__(self, jwt: str, jwks_by_kids: dict, nonce: str = ""):

"""
Helper class for generate Direct Post Response.
"""
def __init__(self, jwt: str, jwks_by_kids: Dict[str, dict], nonce: str = ""):
"""
Generate an instance of DirectPostResponse.
:param jwt: a string that represents the jwt.
:type jwt: str
:param jwks_by_kids: a dictionary that contains one or more JWKs with the KID as the key.
:type jwks_by_kids: Dict[str, dict]
:param nonce: a string that represents the nonce.
:type nonce: str
"""
self.headers = decode_jwt_header(jwt)
self.jwks_by_kids = jwks_by_kids
self.jwt = jwt
Expand All @@ -26,35 +37,48 @@ def __init__(self, jwt: str, jwks_by_kids: dict, nonce: str = ""):
self.credentials_by_issuer: dict = {}
self._claims_by_issuer: dict = {}

@property
def payload(self) -> dict:
# TODO: detect if it is encrypted otherwise ...
# here we support only the encrypted jwt
if not self._payload:
self.decrypt()
return self._payload
def _decode_payload(self) -> None:
"""
Internally decrypts the content of the JWT.
def decrypt(self) -> None:
:raises JWSVerificationError: if jws field is not in a JWS Format
:raises JWEDecryptionError: if jwe field is not in a JWE Format
"""
_kid = self.headers.get('kid', None)
if not _kid:
raise KidNotFoundError(
f"The JWT headers {self.headers} doesnt have any KID value"
)
self.jwk = JWK(self.jwks_by_kids[_kid])
jweHelper = JWEHelper(self.jwk)
try:

if is_jwe_format(self.jwt):
jweHelper = JWEHelper(self.jwk)
self._payload = jweHelper.decrypt(self.jwt)
except Exception as e:
_msg = f"Response decryption error: {e}"
raise JWEDecryptionError(_msg)
else:
jwsHelper = JWSHelper(self.jwk)
self._payload = jwsHelper.verify(self.jwt)

def load_nonce(self, nonce: str):
def load_nonce(self, nonce: str) -> None:
"""
Load a nonce string inside the body of response.
:param nonce: a string that represents the nonce.
:type nonce: str
"""
self.nonce = nonce

def validate(self) -> bool:
def _validate_vp(self, vp: dict) -> bool:
"""
Validate a single Verifiable Presentation.
# check nonces
for vp in self.get_presentation_vps():
:param vp: the verifiable presentation to validate.
:type vp: str
:returns: True if is valid, False otherwhise.
:rtype: bool
"""
try:
# check nonce
if self.nonce:
if not vp.payload.get('nonce', None):
raise NoNonceInVPToken()
Expand All @@ -66,23 +90,40 @@ def validate(self) -> bool:
)
VPTokenPayload(**vp.payload)
VPTokenHeader(**vp.headers)
except ValidationError:
return False
return True


def validate(self) -> bool:
"""
Validates all VPs inside JWT's body.
:returns: True if all VP are valid, False otherwhise.
:rtype: bool
"""

for vp in self.get_presentation_vps():
if not self._validate_vp(vp):
return False

return True

@property
def vps(self):
if not self._vps:
self.get_presentation_vps()
return self._vps
def get_presentation_vps(self) -> list[dict]:
"""
Returns the presentation's verifiable presentations
def get_presentation_vps(self):
:returns: the list of vps.
:rtype: list[dict]
"""
if self._vps:
return self._vps

_vps = self.payload.get('vp_token', [])
vps = [_vps] if isinstance(_vps, str) else _vps

if not vps:
raise VPNotFound("vp is null")
raise VPNotFound(f"Vps for response with nonce \"{self.nonce}\" are empty")

for vp in vps:
_vp = Vp(vp)
Expand All @@ -95,3 +136,17 @@ def get_presentation_vps(self):
self.credentials_by_issuer[cred_iss].append(_vp.payload['vp'])

return self._vps

@property
def vps(self) -> list[dict]:
"""Returns the presentation's verifiable presentations"""
if not self._vps:
self.get_presentation_vps()
return self._vps

@property
def payload(self) -> dict:
"""Returns the decoded payload of presentation"""
if not self._payload:
self._decode_payload()
return self._payload
67 changes: 43 additions & 24 deletions pyeudiw/openid4vp/vp.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,62 @@

from pyeudiw.jwt.utils import decode_jwt_payload, decode_jwt_header
from pyeudiw.openid4vp.vp_sd_jwt import VpSdJwt


class Vp(VpSdJwt):
"Class for SD-JWT Format"
def __init__(self, jwt: str) -> None:
"""
Generates a VP istance.
def __init__(self, jwt: str):
# TODO: what if the credential is not a JWT?
self.headers = decode_jwt_header(jwt)
self.jwt = jwt
self.payload = decode_jwt_payload(jwt)
:param jwt: a string that represents the jwt.
:type jwt: str
self.credential_headers: dict = {}
self.credential_payload: dict = {}
:raises InvalidVPToken: if the jwt field's value is not a JWT.
"""
super().__init__(jwt)

self.parse_digital_credential()
self.disclosed_user_attributes: dict = {}

def _detect_vp_type(self):
# TODO - automatic detection of the credential
return 'jwt'

def get_credential_jwks(self):
def _detect_vp_type(self) -> str:
"""
Detects and return the type of verifiable presentation.
:returns: the type of VP.
:rtype: str
"""
return self.headers["typ"].lower()

def get_credential_jwks(self) -> list[dict]:
"""
Returns the credential JWKs.
:returns: the list containing credential's JWKs.
:rtype: list[dict]
"""
if not self.credential_jwks:
return {}
return self.credential_jwks

@property
def credential_issuer(self):
if not self.credential_payload.get('iss', None):
self.parse_digital_credential()
return self.credential_payload.get('iss', None)

def parse_digital_credential(self):
def parse_digital_credential(self) -> None:
"""
Parse the digital credential of VP.
:raises NotImplementedError: if VP Digital credentials type not implemented.
"""
_typ = self._detect_vp_type()
if _typ == 'jwt':
self.credential_headers = decode_jwt_header(self.payload['vp'])
self.credential_payload = decode_jwt_payload(self.payload['vp'])
else:

if _typ != 'jwt':
raise NotImplementedError(
f"VP Digital credentials type not implemented yet: {_typ}"
)

self.credential_headers = decode_jwt_header(self.payload['vp'])
self.credential_payload = decode_jwt_payload(self.payload['vp'])

@property
def credential_issuer(self) -> str:
"""Returns the credential issuer"""
if not self.credential_payload.get('iss', None):
self.parse_digital_credential()
return self.credential_payload.get('iss', None)
Loading

0 comments on commit f305d22

Please sign in to comment.