Skip to content

Commit

Permalink
[Feat/documentation] Docs for jwk, jwt and oauth packages (#201)
Browse files Browse the repository at this point in the history
* feat: added policy apply on metadata

* test: added intial tests for TrustEvaluationHelper

* fix: fixed validation issues

* feat: implemented method add_trust_attestation_metadata

* test: added test for add_trust_attestation_metadata

* fix: added metadata association by metadata_type field

* fix: minor fix to test for add_trust_attestation_metadata's data type

* chore: renamed test file

* chore: Removed comment

* fix: fixed x509 verification exception handling

* chore: fix typo

* fix: merged federation and metadata policy implementation

* test: adapted tests

* feat: added final_metadata property

* feat: added chain discovery plus refactoring

* docs: documented file class and functions

* fix: fixed trust_anchor_entity_conf handling

* docs: documented trust_chain_builder.py

* fix: moved implementation of get_http_url in utils.py

* fix: fixed response handling

* docs: documented file class and function plus refactoring

* docs: documented file __init__.py

* docs: added docs for http_client.py

* docs: documented the content of __init__.py

* docs: documented contento of __init__.py

* fix: method name refactoring

* fix: added exception

* fix: refactored method find_jwk

* docs: fixed documentation

* fix: refactoring

* docs: documented content of utils.py

* docs: documented __init__.py content

---------

Co-authored-by: Giuseppe De Marco <[email protected]>
  • Loading branch information
PascalDR and Giuseppe De Marco authored Dec 12, 2023
1 parent 78c00cb commit 3d2ced7
Show file tree
Hide file tree
Showing 18 changed files with 348 additions and 115 deletions.
57 changes: 36 additions & 21 deletions pyeudiw/federation/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
EntityConfigurationHeader,
EntityStatementPayload
)
from pyeudiw.jwt.utils import unpad_jwt_payload, unpad_jwt_header
from pyeudiw.jwt.utils import decode_jwt_payload, decode_jwt_header
from pyeudiw.jwt import JWSHelper
from pyeudiw.tools.utils import get_http_url
from pydantic import ValidationError

from pyeudiw.jwk import find_jwk

import json
import logging
Expand Down Expand Up @@ -135,8 +136,8 @@ def __init__(self, jwt: str, httpc_params: dict):
"""

self.jwt = jwt
self.header = unpad_jwt_header(jwt)
self.payload = unpad_jwt_payload(jwt)
self.header = decode_jwt_header(jwt)
self.payload = decode_jwt_payload(jwt)

self.id = self.payload["id"]
self.sub = self.payload["sub"]
Expand Down Expand Up @@ -165,15 +166,19 @@ def validate_by(self, ec: dict) -> bool:
f"Trust Mark validation failed: "
f"{e}"
)

_kid = self.header["kid"]


if self.header.get("kid") not in ec.kids:
if _kid not in ec.kids:
raise UnknownKid( # pragma: no cover
f"Trust Mark validation failed: "
f"{self.header.get('kid')} not found in {ec.jwks}"
)

_jwk = find_jwk(_kid, ec.jwks)

# verify signature
jwsh = JWSHelper(ec.jwks[ec.kids.index(self.header["kid"])])
jwsh = JWSHelper(_jwk)
payload = jwsh.verify(self.jwt)
self.is_valid = True
return payload
Expand All @@ -189,13 +194,15 @@ def validate_by_its_issuer(self) -> bool:
self.issuer_entity_configuration = get_entity_configurations(
self.iss, self.httpc_params, False
)

_kid = self.header.get('kid')
try:
ec = EntityStatement(self.issuer_entity_configuration[0])
ec.validate_by_itself()
except UnknownKid:
logger.warning(
f"Trust Mark validation failed by its Issuer: "
f"{self.header.get('kid')} not found in "
f"{_kid} not found in "
f"{self.issuer_entity_configuration.jwks}")
return False
except Exception:
Expand All @@ -205,7 +212,8 @@ def validate_by_its_issuer(self) -> bool:
return False

# verify signature
jwsh = JWSHelper(ec.jwks[ec.kids.index(self.header["kid"])])
_jwk = find_jwk(_kid, ec.jwks)
jwsh = JWSHelper(_jwk)
payload = jwsh.verify(self.jwt)
self.is_valid = True
return payload
Expand Down Expand Up @@ -241,8 +249,8 @@ def __init__(
:param trust_mark_issuers_entity_confs: the list containig the trust mark's entiity confs
"""
self.jwt = jwt
self.header = unpad_jwt_header(jwt)
self.payload = unpad_jwt_payload(jwt)
self.header = decode_jwt_header(jwt)
self.payload = decode_jwt_payload(jwt)
self.sub = self.payload["sub"]
self.iss = self.payload["iss"]
self.exp = self.payload["exp"]
Expand Down Expand Up @@ -300,11 +308,15 @@ def validate_by_itself(self) -> bool:
f"{e}"
)

if self.header.get("kid") not in self.kids:
_kid = self.header.get("kid")

if _kid not in self.kids:
raise UnknownKid(
f"{self.header.get('kid')} not found in {self.jwks}") # pragma: no cover
f"{_kid} not found in {self.jwks}") # pragma: no cover

# verify signature
jwsh = JWSHelper(self.jwks[self.kids.index(self.header["kid"])])
_jwk = find_jwk(_kid, self.jwks)
jwsh = JWSHelper(_jwk)
jwsh.verify(self.jwt)
self.is_valid = True
return True
Expand Down Expand Up @@ -501,8 +513,8 @@ def validate_descendant_statement(self, jwt: str) -> bool:
:returns: True if is valid or False otherwise
:rtype: bool
"""
header = unpad_jwt_header(jwt)
payload = unpad_jwt_payload(jwt)
header = decode_jwt_header(jwt)
payload = decode_jwt_payload(jwt)

try:
EntityConfigurationHeader(**header)
Expand All @@ -520,12 +532,15 @@ def validate_descendant_statement(self, jwt: str) -> bool:
f"{e}"
)

if header.get("kid") not in self.kids:
_kid = header.get("kid")

if _kid not in self.kids:
raise UnknownKid(
f"{self.header.get('kid')} not found in {self.jwks}")
f"{_kid} not found in {self.jwks}")

# verify signature
jwsh = JWSHelper(self.jwks[self.kids.index(header["kid"])])
_jwk = find_jwk(_kid, self.jwks)
jwsh = JWSHelper(_jwk)
payload = jwsh.verify(jwt)

self.verified_descendant_statements[payload["sub"]] = payload
Expand All @@ -546,13 +561,13 @@ def validate_by_superior_statement(self, jwt: str, ec: 'EntityStatement') -> str
is_valid = None
payload = {}
try:
payload = unpad_jwt_payload(jwt)
payload = decode_jwt_payload(jwt)
ec.validate_by_itself()
ec.validate_descendant_statement(jwt)
_jwks = get_federation_jwks(payload)
_kids = [i.get("kid") for i in _jwks]
_jwk = find_jwk(self.header["kid"], _jwks)

jwsh = JWSHelper(_jwks[_kids.index(self.header["kid"])])
jwsh = JWSHelper(_jwk)
payload = jwsh.verify(self.jwt)

is_valid = True
Expand Down
54 changes: 19 additions & 35 deletions pyeudiw/federation/trust_chain_validator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from pyeudiw.tools.utils import iat_now
from pyeudiw.jwt import JWSHelper
from pyeudiw.jwt.utils import unpad_jwt_payload, unpad_jwt_header
from pyeudiw.jwt.utils import decode_jwt_payload, decode_jwt_header
from pyeudiw.federation import is_es
from pyeudiw.federation.policy import TrustChainPolicy
from pyeudiw.federation.statements import (
Expand All @@ -15,27 +15,10 @@
KeyValidationError
)

logger = logging.getLogger(__name__)


def find_jwk(kid: str, jwks: list[dict]) -> dict:
"""
Find the JWK with the indicated kid in the jwks list.
:param kid: the identifier of the jwk
:type kid: str
:param jwks: the list of jwks
:type jwks: list[dict]
from pyeudiw.jwk import find_jwk
from pyeudiw.jwk.exceptions import KidNotFoundError, InvalidKid

:returns: the jwk with the indicated kid or an empty dict if no jwk is found
:rtype: dict
"""
if not kid:
return {}
for jwk in jwks:
valid_jwk = jwk.get("kid", None)
if valid_jwk and kid == valid_jwk:
return jwk
logger = logging.getLogger(__name__)


class StaticTrustChainValidator:
Expand Down Expand Up @@ -141,8 +124,8 @@ def validate(self) -> bool:
# inspect the entity statement kid header to know which
# TA's public key to use for the validation
last_element = rev_tc[0]
es_header = unpad_jwt_header(last_element)
es_payload = unpad_jwt_payload(last_element)
es_header = decode_jwt_header(last_element)
es_payload = decode_jwt_payload(last_element)

ta_jwk = find_jwk(
es_header.get("kid", None), self.trust_anchor_jwks
Expand All @@ -169,13 +152,14 @@ def validate(self) -> bool:
# validate the entire chain taking in cascade using fed_jwks
# if valid -> update fed_jwks with $st
for st in rev_tc[1:]:
st_header = unpad_jwt_header(st)
st_payload = unpad_jwt_payload(st)
jwk = find_jwk(
st_header.get("kid", None), fed_jwks
)
st_header = decode_jwt_header(st)
st_payload = decode_jwt_payload(st)

if not jwk:
try:
jwk = find_jwk(
st_header.get("kid", None), fed_jwks
)
except (KidNotFoundError, InvalidKid):
return False

jwsh = JWSHelper(jwk)
Expand Down Expand Up @@ -237,7 +221,7 @@ def _update_st(self, st: str) -> str:
:returns: the entity statement in form of JWT.
:rtype: str
"""
payload = unpad_jwt_payload(st)
payload = decode_jwt_payload(st)
iss = payload['iss']
if not is_es(payload):
# It's an entity configuration
Expand All @@ -251,7 +235,7 @@ def _update_st(self, st: str) -> str:
)
else:
ec = self._retrieve_ec(iss)
ec_data = unpad_jwt_payload(ec)
ec_data = decode_jwt_payload(ec)
fetch_api_url = None

try:
Expand Down Expand Up @@ -290,7 +274,7 @@ def update(self) -> bool:
for st in self.static_trust_chain:
jwt = self._update_st(st)

exp = unpad_jwt_payload(jwt)["exp"]
exp = decode_jwt_payload(jwt)["exp"]
self.set_exp(exp)

self.updated_trust_chain.append(jwt)
Expand All @@ -316,18 +300,18 @@ def is_expired(self) -> int:
def entity_id(self) -> str:
"""Get the chain's entity_id."""
chain = self.trust_chain
payload = unpad_jwt_payload(chain[0])
payload = decode_jwt_payload(chain[0])
return payload["iss"]

@property
def final_metadata(self) -> dict:
"""Apply the metadata and returns the final metadata."""
anchor = self.static_trust_chain[-1]
es_anchor_payload = unpad_jwt_payload(anchor)
es_anchor_payload = decode_jwt_payload(anchor)

policy = es_anchor_payload.get("metadata_policy", {})

leaf = self.static_trust_chain[0]
es_leaf_payload = unpad_jwt_payload(leaf)
es_leaf_payload = decode_jwt_payload(leaf)

return TrustChainPolicy().apply_policy(es_leaf_payload["metadata"], policy)
77 changes: 72 additions & 5 deletions pyeudiw/jwk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,38 @@
from cryptojwt.jwk.jwk import key_from_jwk_dict
from cryptojwt.jwk.rsa import new_rsa_key

from .exceptions import InvalidKid, KidNotFoundError

KEY_TYPES_FUNC = dict(
EC=new_ec_key,
RSA=new_rsa_key
)


class JWK():
"""
The class representing a JWK istance
"""

def __init__(
self,
key: Union[dict, None] = None,
key_type: str = "EC",
hash_func: str = 'SHA-256',
ec_crv: str = "P-256"
) -> None:
"""
Creates an instance of JWK.
:param key: An optional key in dict form.
If no key is provided a randomic key will be generated.
:type key: Union[dict, None]
:param key_type: a string that represents the key type. Can be EC or RSA.
:type key_type: str
:param hash_func: a string that represents the hash function to use with the instance.
:type hash_func: str
:param ec_crv: a string that represents the curve to use with the instance.
:type ec_crv: str
"""
kwargs = {}
self.kid = ""

Expand All @@ -46,10 +63,22 @@ def __init__(
self.public_key = self.key.serialize()
self.public_key['kid'] = self.jwk["kid"]

def as_json(self):
def as_json(self) -> str:
"""
Returns the JWK in format of json string.
:returns: A json string that represents the key.
:rtype: str
"""
return json.dumps(self.jwk)

def export_private_pem(self):
def export_private_pem(self) -> str:
"""
Returns the JWK in format of a private pem certificte.
:returns: A private pem certificate that represents the key.
:rtype: str
"""
_k = key_from_jwk_dict(self.jwk)
pk = _k.private_key()
pem = pk.private_bytes(
Expand All @@ -59,7 +88,13 @@ def export_private_pem(self):
)
return pem.decode()

def export_public_pem(self):
def export_public_pem(self) -> str:
"""
Returns the JWK in format of a public pem certificte.
:returns: A public pem certificate that represents the key.
:rtype: str
"""
_k = key_from_jwk_dict(self.jwk)
pk = _k.public_key()
cert = pk.public_bytes(
Expand All @@ -68,9 +103,41 @@ def export_public_pem(self):
)
return cert.decode()

def as_dict(self):
def as_dict(self) -> dict:
"""
Returns the JWK in format of dict.
:returns: The key in form of dict.
:rtype: dict
"""
return self.jwk

def __repr__(self):
# private part!
return self.as_json()

def find_jwk(kid: str, jwks: list[dict], as_dict: bool=True) -> dict | JWK:
"""
Find the JWK with the indicated kid in the jwks list.
:param kid: the identifier of the jwk
:type kid: str
:param jwks: the list of jwks
:type jwks: list[dict]
:param as_dict: if True the return type will be a dict, JWK otherwise.
:type as_dict: bool
:raises InvalidKid: if kid is None.
:raises KidNotFoundError: if kid is not in jwks list.
:returns: the jwk with the indicated kid or an empty dict if no jwk is found
:rtype: dict | JWK
"""
if not kid:
raise InvalidKid("Kid cannot be empty")
for jwk in jwks:
valid_jwk = jwk.get("kid", None)
if valid_jwk and kid == valid_jwk:
return jwk if as_dict else JWK(jwk)

raise KidNotFoundError(f"Key with Kid {kid} not found")
Loading

0 comments on commit 3d2ced7

Please sign in to comment.