Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Fast enum cache updates #1094

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions python/composio/client/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,15 @@ def get(self, name: t.Optional[str] = None) -> t.Union[AppModel, t.List[AppModel

return super().get(queries={})

def list_enums(self) -> list[str]:
"""Get just the app names on the server."""
response = self._raise_if_required(
response=self.client.http.get(
str(self.endpoint / "list" / "enums"),
)
)
return response.text.split("\n")
Comment on lines +357 to +364

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The list_enums method splits the response text by newline characters, which might not be robust if the server returns enums with spaces or other special characters. It should use a more robust method like json.loads if the server intends to return a JSON array of enums.

📝 Committable Code Suggestion

‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
def list_enums(self) -> list[str]:
"""Get just the app names on the server."""
response = self._raise_if_required(
response=self.client.http.get(
str(self.endpoint / "list" / "enums"),
)
)
return response.text.split("\n")
def list_enums(self) -> list[str]:
"""Get just the app names on the server."""
response = self._raise_if_required(
response=self.client.http.get(
str(self.endpoint / "list" / "enums"),
)
)
return json.loads(response.text)



class TypeModel(BaseModel):
type: str
Expand Down Expand Up @@ -1081,9 +1090,9 @@ class Actions(Collection[ActionModel]):
# TODO: Overload
def get( # type: ignore
self,
actions: t.Optional[t.Sequence[ActionType]] = None,
apps: t.Optional[t.Sequence[AppType]] = None,
tags: t.Optional[t.Sequence[TagType]] = None,
actions: t.Optional[t.Collection[ActionType]] = None,
apps: t.Optional[t.Collection[AppType]] = None,
tags: t.Optional[t.Collection[TagType]] = None,
limit: t.Optional[int] = None,
use_case: t.Optional[str] = None,
allow_all: bool = False,
Expand Down Expand Up @@ -1392,6 +1401,15 @@ def search_for_a_task(
for task in response.json().get("items", [])
]

def list_enums(self) -> list[str]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a docstring to the list_enums method explaining its purpose, return type, and any potential exceptions it might raise.

"""Get just the action names on the server"""
response = self._raise_if_required(
response=self.client.http.get(
str(self.endpoint / "list" / "enums"),
)
)
return response.text.split("\n")
Comment on lines +1404 to +1411

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as above. response.text.split is not robust and should be replaced with json.loads if the server returns a JSON array.

📝 Committable Code Suggestion

‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
def list_enums(self) -> list[str]:
"""Get just the action names on the server"""
response = self._raise_if_required(
response=self.client.http.get(
str(self.endpoint / "list" / "enums"),
)
)
return response.text.split("\n")
def list_enums(self) -> list[str]:
"""Get just the action names on the server"""
response = self._raise_if_required(
response=self.client.http.get(
str(self.endpoint / "list" / "enums"),
)
)
return json.loads(response.text)



class ExpectedFieldInput(BaseModel):
name: str
Expand Down
21 changes: 2 additions & 19 deletions python/composio/client/enums/action.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import typing as t
import warnings

from composio.client.enums.base import ActionData, replacement_action_name
from composio.client.enums.base import ActionData, create_action
from composio.client.enums.enum import Enum, EnumGenerator
from composio.exceptions import ComposioSDKError

Expand Down Expand Up @@ -72,24 +72,7 @@ def fetch_and_cache(self) -> t.Optional[ActionData]:
if "appName" not in response:
return None

replaced_by = replacement_action_name(
response["description"], response["appName"]
)
return ActionData( # type: ignore
name=response["name"],
app=response["appName"],
tags=response["tags"],
no_auth=(
client.http.get(url=str(client.apps.endpoint / response["appName"]))
.json()
.get("no_auth", False)
),
is_local=False,
is_runtime=False,
shell=False,
path=self.storage_path,
replaced_by=replaced_by,
)
return create_action(client, response, self.storage_path)

@property
def name(self) -> str:
Comment on lines 72 to 78

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential Issue:

Ensure create_action Functionality Matches Original Logic
The refactoring introduces a call to create_action, replacing a block of code that constructs an ActionData object. This change is high-risk as it may introduce logical errors if create_action does not replicate the original behavior, especially concerning the no_auth and replaced_by fields.

Actionable Steps:

  • Review create_action Implementation: Ensure it correctly handles the no_auth and replaced_by fields as the original code did.
  • Test Edge Cases: Verify that all edge cases covered by the original logic are addressed in the new function.
  • Documentation: Update any relevant documentation to reflect changes in logic or function usage.

This will help maintain the integrity of the application and prevent potential data inconsistencies. 🛠️


Expand Down
23 changes: 23 additions & 0 deletions python/composio/client/enums/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@

import difflib
import typing as t
from pathlib import Path

from composio.constants import LOCAL_CACHE_DIRECTORY
from composio.exceptions import ComposioSDKError
from composio.storage.base import LocalStorage


if t.TYPE_CHECKING:
from composio.client import Composio

_runtime_actions: t.Dict[str, "ActionData"] = {}

EntityType = t.TypeVar("EntityType", bound=LocalStorage)
Expand Down Expand Up @@ -118,3 +122,22 @@ def replacement_action_name(description: str, app_name: str) -> t.Optional[str]:
return (app_name + "_" + newact.replace(">>", "")).upper()

return None


def create_action(
client: "Composio",
response: dict[str, t.Any],
storage_path: Path,
) -> ActionData:
replaced_by = replacement_action_name(response["description"], response["appName"])
return ActionData( # type: ignore
name=response["name"],
app=response["appName"],
tags=response["tags"],
no_auth=response["no_auth"],
is_local=False,
is_runtime=False,
shell=False,
path=storage_path,
replaced_by=replaced_by,
)
23 changes: 6 additions & 17 deletions python/composio/client/enums/enum.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,24 +90,13 @@ def __eq__(self, other: object) -> bool:
return False

@classmethod
def iter(cls) -> t.Iterator[str]:
def iter(cls) -> t.Iterable[str]:
"""Yield the enum names as strings."""
# TODO: fetch trigger names from dedicated endpoint in the future
path = LOCAL_CACHE_DIRECTORY / cls.cache_folder
# If we try to fetch Actions.iter() with local caching disabled
# for example, we'd get here.
if not path.exists():
# pylint: disable=import-outside-toplevel
from composio.client import Composio

# pylint: disable=import-outside-toplevel
from composio.client.utils import check_cache_refresh

check_cache_refresh(Composio.get_latest())
if not path.exists():
return

yield from os.listdir(path)
# pylint: disable=import-outside-toplevel
from composio.client import Composio

client = Composio.get_latest()
return client.actions.list_enums()

@classmethod
def all(cls) -> t.Iterator[te.Self]:
Expand Down
89 changes: 77 additions & 12 deletions python/composio/client/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import json
import os
import time
import typing as t

from composio.client import Composio, enums
from composio.client.collections import ActionModel, AppModel, TriggerModel
from composio.client.enums.base import replacement_action_name
from composio.client.enums.base import AppData, create_action, replacement_action_name
from composio.tools.local import load_local_tools
from composio.utils import get_enum_key
from composio.utils.logging import get_logger
Expand Down Expand Up @@ -208,6 +209,9 @@ def _update_triggers(
).store()


_cache_checked = False


def check_cache_refresh(client: Composio) -> None:
"""
Check if the actions have a 'replaced_by' field and refresh the cache if not.
Expand All @@ -218,18 +222,79 @@ def check_cache_refresh(client: Composio) -> None:
SDK version, and didn't come from the API. We need to start storing the data
from the API and invalidate the cache if the data is not already stored.
"""
global _cache_checked
if _cache_checked:
return

_cache_checked = True

t0 = time.monotonic()
if NO_CACHE_REFRESH:
return

local_actions = []
if enums.base.ACTIONS_CACHE.exists():
actions = list(enums.base.ACTIONS_CACHE.iterdir())
for action in actions:
action_data = json.loads(action.read_text())
# The action file could be old. If it doesn't have a
# replaced_by field, we want to overwrite it.
if "replaced_by" not in action_data:
action.unlink()
continue

local_actions.append(action.stem)

api_actions = client.actions.list_enums()
actions_to_update = set(api_actions) - set(local_actions)
actions_to_delete = set(local_actions) - set(api_actions)
logger.debug("Actions to fetch: %s", actions_to_update)
logger.debug("Stale actions: %s", actions_to_delete)

for action_name in actions_to_delete:
(enums.base.ACTIONS_CACHE / action_name).unlink()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling for the case where the file unlink operation fails. This could happen due to permission issues or if the file is locked by another process.


if len(actions_to_update) > 50:
# Major update, refresh everything
apps = update_apps(client)
update_actions(client, apps)
update_triggers(client, apps)
return

local_apps = []
if enums.base.ACTIONS_CACHE.exists():
first_file = next(enums.base.ACTIONS_CACHE.iterdir(), None)
if first_file is not None:
first_action = json.loads(first_file.read_text())
if "replaced_by" in first_action:
logger.debug("Actions cache is up-to-date")
return

logger.info("Actions cache is outdated, refreshing cache...")
apps = update_apps(client)
update_actions(client, apps)
update_triggers(client, apps)
local_apps = list(path.stem for path in enums.base.APPS_CACHE.iterdir())

api_apps = client.apps.list_enums()
breakpoint()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the breakpoint() call. It seems to be left over from debugging and is not suitable for production code.

apps_to_update = set(api_apps) - set(local_apps)
apps_to_delete = set(local_apps) - set(api_apps)
logger.debug("Apps to fetch: %s", apps_to_update)
logger.debug("Stale apps: %s", apps_to_delete)

# for app_name in apps_to_delete:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider removing or uncommenting the code for handling apps_to_delete and apps_to_update. Leaving it commented can lead to confusion and clutter.

# (enums.base.APPS_CACHE / app_name).unlink()

# if apps_to_update:
# apps_data = client.http.get(
# str(client.apps.endpoint(queries={"apps": ",".join(apps_to_update)}))
# ).json()
# for app_data in apps_data["items"]:
# storage_path = enums.base.APPS_CACHE / app_data["name"]
# AppData(name=app_data["name"], path=storage_path, is_local=False).store()

# if actions_to_update:
# actions_data = client.http.get(
# str(
# client.actions.endpoint(
# queries={"actions": ",".join(actions_to_update)}
# )
# )
# ).json()
# for action_data in actions_data["items"]:
# storage_path = enums.base.ACTIONS_CACHE / action_data["name"]
# create_action(
# client, response=action_data, storage_path=storage_path
# ).store()
Comment on lines +268 to +298

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check_cache_refresh function contains commented-out code blocks and a breakpoint. These should be removed before merging.

📝 Committable Code Suggestion

‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
api_apps = client.apps.list_enums()
breakpoint()
apps_to_update = set(api_apps) - set(local_apps)
apps_to_delete = set(local_apps) - set(api_apps)
logger.debug("Apps to fetch: %s", apps_to_update)
logger.debug("Stale apps: %s", apps_to_delete)
# for app_name in apps_to_delete:
# (enums.base.APPS_CACHE / app_name).unlink()
# if apps_to_update:
# apps_data = client.http.get(
# str(client.apps.endpoint(queries={"apps": ",".join(apps_to_update)}))
# ).json()
# for app_data in apps_data["items"]:
# storage_path = enums.base.APPS_CACHE / app_data["name"]
# AppData(name=app_data["name"], path=storage_path, is_local=False).store()
# if actions_to_update:
# actions_data = client.http.get(
# str(
# client.actions.endpoint(
# queries={"actions": ",".join(actions_to_update)}
# )
# )
# ).json()
# for action_data in actions_data["items"]:
# storage_path = enums.base.ACTIONS_CACHE / action_data["name"]
# create_action(
# client, response=action_data, storage_path=storage_path
# ).store()
api_apps = client.apps.list_enums()
apps_to_update = set(api_apps) - set(local_apps)
apps_to_delete = set(local_apps) - set(api_apps)
logger.debug("Apps to fetch: %s", apps_to_update)
logger.debug("Stale apps: %s", apps_to_delete)
for app_name in apps_to_delete:
(enums.base.APPS_CACHE / app_name).unlink()
if apps_to_update:
apps_data = client.http.get


logger.debug("Time taken to update cache: %.2f seconds", time.monotonic() - t0)
Loading