-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Fast enum cache updates #1094
base: master
Are you sure you want to change the base?
Changes from 5 commits
5912085
3d78343
6c30c56
ad087df
659179f
8ce9ea8
1bb796b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
import typing as t | ||
import warnings | ||
|
||
from composio.client.enums.base import ActionData, replacement_action_name | ||
from composio.client.enums.base import ActionData, create_action | ||
from composio.client.enums.enum import Enum, EnumGenerator | ||
from composio.exceptions import ComposioSDKError | ||
|
||
|
@@ -72,24 +72,7 @@ def fetch_and_cache(self) -> t.Optional[ActionData]: | |
if "appName" not in response: | ||
return None | ||
|
||
replaced_by = replacement_action_name( | ||
response["description"], response["appName"] | ||
) | ||
return ActionData( # type: ignore | ||
name=response["name"], | ||
app=response["appName"], | ||
tags=response["tags"], | ||
no_auth=( | ||
client.http.get(url=str(client.apps.endpoint / response["appName"])) | ||
.json() | ||
.get("no_auth", False) | ||
), | ||
is_local=False, | ||
is_runtime=False, | ||
shell=False, | ||
path=self.storage_path, | ||
replaced_by=replaced_by, | ||
) | ||
return create_action(client, response, self.storage_path) | ||
|
||
@property | ||
def name(self) -> str: | ||
Comment on lines
72
to
78
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Actionable Steps:
This will help maintain the integrity of the application and prevent potential data inconsistencies. 🛠️ |
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,11 @@ | ||
import json | ||
import os | ||
import time | ||
import typing as t | ||
|
||
from composio.client import Composio, enums | ||
from composio.client.collections import ActionModel, AppModel, TriggerModel | ||
from composio.client.enums.base import replacement_action_name | ||
from composio.client.enums.base import create_action, replacement_action_name | ||
from composio.tools.local import load_local_tools | ||
from composio.utils import get_enum_key | ||
from composio.utils.logging import get_logger | ||
|
@@ -208,6 +209,9 @@ def _update_triggers( | |
).store() | ||
|
||
|
||
_cache_checked = False | ||
|
||
|
||
def check_cache_refresh(client: Composio) -> None: | ||
""" | ||
Check if the actions have a 'replaced_by' field and refresh the cache if not. | ||
|
@@ -218,18 +222,52 @@ def check_cache_refresh(client: Composio) -> None: | |
SDK version, and didn't come from the API. We need to start storing the data | ||
from the API and invalidate the cache if the data is not already stored. | ||
""" | ||
global _cache_checked | ||
if _cache_checked: | ||
return | ||
|
||
_cache_checked = True | ||
|
||
t0 = time.monotonic() | ||
if NO_CACHE_REFRESH: | ||
return | ||
|
||
local_actions = [] | ||
if enums.base.ACTIONS_CACHE.exists(): | ||
first_file = next(enums.base.ACTIONS_CACHE.iterdir(), None) | ||
if first_file is not None: | ||
first_action = json.loads(first_file.read_text()) | ||
if "replaced_by" in first_action: | ||
logger.debug("Actions cache is up-to-date") | ||
return | ||
|
||
logger.info("Actions cache is outdated, refreshing cache...") | ||
apps = update_apps(client) | ||
update_actions(client, apps) | ||
update_triggers(client, apps) | ||
actions = list(enums.base.ACTIONS_CACHE.iterdir()) | ||
for action in actions: | ||
action_data = json.loads(action.read_text()) | ||
# The action file could be old. If it doesn't have a | ||
# replaced_by field, we want to overwrite it. | ||
if "replaced_by" not in action_data: | ||
action.unlink() | ||
continue | ||
|
||
local_actions.append(action.stem) | ||
|
||
api_actions = client.actions.list_enums() | ||
|
||
actions_to_update = set(api_actions) - set(local_actions) | ||
actions_to_delete = set(local_actions) - set(api_actions) | ||
logger.debug("Actions to fetch: %s", actions_to_update) | ||
logger.debug("Stale actions: %s", actions_to_delete) | ||
|
||
for action_name in actions_to_delete: | ||
(enums.base.ACTIONS_CACHE / action_name).unlink() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider adding error handling for the case where the file unlink operation fails. This could happen due to permission issues or if the file is locked by another process. |
||
|
||
if actions_to_update: | ||
# TODO: handle page size limit | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The TODO comment about page size limit is important. Consider adding a maximum batch size for |
||
actions_data = client.http.get( | ||
str( | ||
client.actions.endpoint( | ||
queries={"actions": ",".join(actions_to_update)} | ||
) | ||
) | ||
).json() | ||
for action_data in actions_data["items"]: | ||
storage_path = enums.base.ACTIONS_CACHE / action_data["name"] | ||
create_action( | ||
client, response=action_data, storage_path=storage_path | ||
).store() | ||
|
||
logger.debug("Time taken to update cache: %.2f seconds", time.monotonic() - t0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding a docstring to the
list_enums
method explaining its purpose, return type, and any potential exceptions it might raise.