diff --git a/pyeudiw/storage/mongo_storage.py b/pyeudiw/storage/mongo_storage.py index 541c5471..eda332cc 100644 --- a/pyeudiw/storage/mongo_storage.py +++ b/pyeudiw/storage/mongo_storage.py @@ -27,7 +27,7 @@ def __init__(self, conf: dict, url: str, connection_params: dict = {}) -> None: self.db = None @property - def is_connected(self): + def is_connected(self) -> bool: if not self.client: return False try: @@ -80,7 +80,7 @@ def get_by_nonce_state(self, nonce: str, state: str | None) -> dict: return document - def get_by_session_id(self, session_id: str): + def get_by_session_id(self, session_id: str) -> Union[dict, None]: self._connect() query = {"session_id": session_id} document = self.sessions.find_one(query) @@ -92,7 +92,7 @@ def get_by_session_id(self, session_id: str): return document - def get_by_state_and_session_id(self, state: str, session_id: str = ""): + def get_by_state_and_session_id(self, state: str, session_id: str = "") -> Union[dict, None]: self._connect() query = {"state": state} if session_id: @@ -125,7 +125,7 @@ def init_session(self, document_id: str, session_id: str, state: str) -> str: return document_id - def add_dpop_proof_and_attestation(self, document_id: str, dpop_proof: dict, attestation: dict): + def add_dpop_proof_and_attestation(self, document_id: str, dpop_proof: dict, attestation: dict) -> UpdateResult: self._connect() update_result: UpdateResult = self.sessions.update_one( {"document_id": document_id}, @@ -142,7 +142,7 @@ def add_dpop_proof_and_attestation(self, document_id: str, dpop_proof: dict, att return update_result - def update_request_object(self, document_id: str, request_object: dict): + def update_request_object(self, document_id: str, request_object: dict) -> UpdateResult: self.get_by_id(document_id) documentStatus = self.sessions.update_one( {"document_id": document_id}, @@ -177,7 +177,7 @@ def set_finalized(self, document_id: str): ) return update_result - def update_response_object(self, nonce: str, state: str, internal_response: dict): + def update_response_object(self, nonce: str, state: str, internal_response: dict) -> UpdateResult: document = self.get_by_nonce_state(nonce, state) document_id = document["_id"] document_status = self.sessions.update_one( @@ -190,24 +190,24 @@ def update_response_object(self, nonce: str, state: str, internal_response: dict return document_status - def _get_trust_attestation(self, collection: str, entity_id: str) -> dict: + def _get_trust_attestation(self, collection: str, entity_id: str) -> dict | None: self._connect() db_collection = getattr(self, collection) return db_collection.find_one({"entity_id": entity_id}) - def get_trust_attestation(self, entity_id: str): + def get_trust_attestation(self, entity_id: str) -> dict | None: return self._get_trust_attestation("trust_attestations", entity_id) - def get_trust_anchor(self, entity_id: str): + def get_trust_anchor(self, entity_id: str) -> dict | None: return self._get_trust_attestation("trust_anchors", entity_id) - def _has_trust_attestation(self, collection: str, entity_id: str): - return self._get_trust_attestation(collection, entity_id) + def _has_trust_attestation(self, collection: str, entity_id: str) -> bool: + return self._get_trust_attestation(collection, entity_id) != None - def has_trust_attestation(self, entity_id: str): + def has_trust_attestation(self, entity_id: str) -> bool: return self._has_trust_attestation("trust_attestations", entity_id) - def has_trust_anchor(self, entity_id: str): + def has_trust_anchor(self, entity_id: str) -> bool: return self._has_trust_attestation("trust_anchors", entity_id) def _add_entry( @@ -257,7 +257,7 @@ def _update_anchor_metadata(self, entity: dict, attestation: list[str], exp: dat return entity - def add_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime, trust_type: TrustType): + def add_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime, trust_type: TrustType) -> str: entity = { "entity_id": entity_id, "federation": {}, @@ -267,7 +267,7 @@ def add_trust_attestation(self, entity_id: str, attestation: list[str], exp: dat updated_entity = self._update_attestation_metadata(entity, attestation, exp, trust_type) - self._add_entry( + return self._add_entry( "trust_attestations", entity_id, updated_entity, exp ) @@ -285,7 +285,7 @@ def add_trust_attestation_metadata(self, entity_id: str, metadata_type: str, met def add_trust_anchor(self, entity_id: str, entity_configuration: str, exp: datetime, trust_type: TrustType): if self.has_trust_anchor(entity_id): - self.update_trust_anchor(entity_id, entity_configuration, exp, trust_type) + return self.update_trust_anchor(entity_id, entity_configuration, exp, trust_type) else: entity = { "entity_id": entity_id, @@ -294,7 +294,7 @@ def add_trust_anchor(self, entity_id: str, entity_configuration: str, exp: datet } updated_entity = self._update_anchor_metadata(entity, entity_configuration, exp, trust_type) - self._add_entry("trust_anchors", entity_id, updated_entity, exp) + return self._add_entry("trust_anchors", entity_id, updated_entity, exp) def _update_trust_attestation(self, collection: str, entity_id: str, entity: dict) -> str: if not self._has_trust_attestation(collection, entity_id):