Skip to content

Commit

Permalink
fix: varius minor fixs
Browse files Browse the repository at this point in the history
  • Loading branch information
PascalDR committed Dec 20, 2023
1 parent 3c084a5 commit 3a741d9
Showing 1 changed file with 17 additions and 17 deletions.
34 changes: 17 additions & 17 deletions pyeudiw/storage/mongo_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, conf: dict, url: str, connection_params: dict = {}) -> None:
self.db = None

@property
def is_connected(self):
def is_connected(self) -> bool:
if not self.client:
return False
try:
Expand Down Expand Up @@ -80,7 +80,7 @@ def get_by_nonce_state(self, nonce: str, state: str | None) -> dict:

return document

def get_by_session_id(self, session_id: str):
def get_by_session_id(self, session_id: str) -> Union[dict, None]:
self._connect()
query = {"session_id": session_id}
document = self.sessions.find_one(query)
Expand All @@ -92,7 +92,7 @@ def get_by_session_id(self, session_id: str):

return document

def get_by_state_and_session_id(self, state: str, session_id: str = ""):
def get_by_state_and_session_id(self, state: str, session_id: str = "") -> Union[dict, None]:
self._connect()
query = {"state": state}
if session_id:
Expand Down Expand Up @@ -125,7 +125,7 @@ def init_session(self, document_id: str, session_id: str, state: str) -> str:

return document_id

def add_dpop_proof_and_attestation(self, document_id: str, dpop_proof: dict, attestation: dict):
def add_dpop_proof_and_attestation(self, document_id: str, dpop_proof: dict, attestation: dict) -> UpdateResult:
self._connect()
update_result: UpdateResult = self.sessions.update_one(
{"document_id": document_id},
Expand All @@ -142,7 +142,7 @@ def add_dpop_proof_and_attestation(self, document_id: str, dpop_proof: dict, att

return update_result

def update_request_object(self, document_id: str, request_object: dict):
def update_request_object(self, document_id: str, request_object: dict) -> UpdateResult:
self.get_by_id(document_id)
documentStatus = self.sessions.update_one(
{"document_id": document_id},
Expand Down Expand Up @@ -177,7 +177,7 @@ def set_finalized(self, document_id: str):
)
return update_result

def update_response_object(self, nonce: str, state: str, internal_response: dict):
def update_response_object(self, nonce: str, state: str, internal_response: dict) -> UpdateResult:
document = self.get_by_nonce_state(nonce, state)
document_id = document["_id"]
document_status = self.sessions.update_one(
Expand All @@ -190,24 +190,24 @@ def update_response_object(self, nonce: str, state: str, internal_response: dict

return document_status

def _get_trust_attestation(self, collection: str, entity_id: str) -> dict:
def _get_trust_attestation(self, collection: str, entity_id: str) -> dict | None:
self._connect()
db_collection = getattr(self, collection)
return db_collection.find_one({"entity_id": entity_id})

def get_trust_attestation(self, entity_id: str):
def get_trust_attestation(self, entity_id: str) -> dict | None:
return self._get_trust_attestation("trust_attestations", entity_id)

def get_trust_anchor(self, entity_id: str):
def get_trust_anchor(self, entity_id: str) -> dict | None:
return self._get_trust_attestation("trust_anchors", entity_id)

def _has_trust_attestation(self, collection: str, entity_id: str):
return self._get_trust_attestation(collection, entity_id)
def _has_trust_attestation(self, collection: str, entity_id: str) -> bool:
return self._get_trust_attestation(collection, entity_id) != None

def has_trust_attestation(self, entity_id: str):
def has_trust_attestation(self, entity_id: str) -> bool:
return self._has_trust_attestation("trust_attestations", entity_id)

def has_trust_anchor(self, entity_id: str):
def has_trust_anchor(self, entity_id: str) -> bool:
return self._has_trust_attestation("trust_anchors", entity_id)

def _add_entry(
Expand Down Expand Up @@ -257,7 +257,7 @@ def _update_anchor_metadata(self, entity: dict, attestation: list[str], exp: dat

return entity

def add_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime, trust_type: TrustType):
def add_trust_attestation(self, entity_id: str, attestation: list[str], exp: datetime, trust_type: TrustType) -> str:
entity = {
"entity_id": entity_id,
"federation": {},
Expand All @@ -267,7 +267,7 @@ def add_trust_attestation(self, entity_id: str, attestation: list[str], exp: dat

updated_entity = self._update_attestation_metadata(entity, attestation, exp, trust_type)

self._add_entry(
return self._add_entry(
"trust_attestations", entity_id, updated_entity, exp
)

Expand All @@ -285,7 +285,7 @@ def add_trust_attestation_metadata(self, entity_id: str, metadata_type: str, met

def add_trust_anchor(self, entity_id: str, entity_configuration: str, exp: datetime, trust_type: TrustType):
if self.has_trust_anchor(entity_id):
self.update_trust_anchor(entity_id, entity_configuration, exp, trust_type)
return self.update_trust_anchor(entity_id, entity_configuration, exp, trust_type)
else:
entity = {
"entity_id": entity_id,
Expand All @@ -294,7 +294,7 @@ def add_trust_anchor(self, entity_id: str, entity_configuration: str, exp: datet
}

updated_entity = self._update_anchor_metadata(entity, entity_configuration, exp, trust_type)
self._add_entry("trust_anchors", entity_id, updated_entity, exp)
return self._add_entry("trust_anchors", entity_id, updated_entity, exp)

def _update_trust_attestation(self, collection: str, entity_id: str, entity: dict) -> str:
if not self._has_trust_attestation(collection, entity_id):
Expand Down

0 comments on commit 3a741d9

Please sign in to comment.