Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up github events recollection #2974

Merged
merged 1 commit into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 39 additions & 15 deletions augur/tasks/github/events.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,41 @@
import logging
import traceback

Check warning on line 2 in augur/tasks/github/events.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 W0611: Unused import traceback (unused-import) Raw Output: augur/tasks/github/events.py:2:0: W0611: Unused import traceback (unused-import)
import sqlalchemy as s

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
W0611: Unused sqlalchemy imported as s (unused-import)

from sqlalchemy.sql import text
from abc import ABC, abstractmethod
from datetime import datetime, timedelta, timezone

from augur.tasks.init.celery_app import celery_app as celery
from augur.tasks.init.celery_app import AugurCoreRepoCollectionTask
from augur.application.db.data_parse import *
from augur.tasks.github.util.github_data_access import GithubDataAccess, UrlNotFoundException
from augur.tasks.github.util.github_random_key_auth import GithubRandomKeyAuth
from augur.tasks.github.util.github_task_session import GithubTaskManifest

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
W0611: Unused GithubTaskManifest imported from augur.tasks.github.util.github_task_session (unused-import)

from augur.tasks.github.util.util import get_owner_repo
from augur.tasks.util.worker_util import remove_duplicate_dicts
from augur.application.db.models import PullRequestEvent, IssueEvent, Contributor, CollectionStatus
from augur.application.db.lib import get_repo_by_repo_git, bulk_insert_dicts, get_issues_by_repo_id, get_pull_requests_by_repo_id, update_issue_closed_cntrbs_by_repo_id, get_session, get_engine
from augur.application.db.models import PullRequestEvent, IssueEvent, Contributor, Repo

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
W0611: Unused Repo imported from augur.application.db.models (unused-import)

from augur.application.db.lib import get_repo_by_repo_git, bulk_insert_dicts, get_issues_by_repo_id, get_pull_requests_by_repo_id, update_issue_closed_cntrbs_by_repo_id, get_session, get_engine, get_core_data_last_collected

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
W0611: Unused get_session imported from augur.application.db.lib (unused-import)



platform_id = 1

@celery.task(base=AugurCoreRepoCollectionTask)
def collect_events(repo_git: str):
def collect_events(repo_git: str, full_collection: bool):

logger = logging.getLogger(collect_events.__name__)

owner, repo = get_owner_repo(repo_git)

logger.debug(f"Collecting Github events for {owner}/{repo}")

if full_collection:
core_data_last_collected = None
else:
repo_id = get_repo_by_repo_git().repo_id

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
E1120: No value for argument 'repo_git' in function call (no-value-for-parameter)


# subtract 2 days to ensure all data is collected
core_data_last_collected = (get_core_data_last_collected(repo_id) - timedelta(days=2)).replace(tzinfo=timezone.utc)

key_auth = GithubRandomKeyAuth(logger)

if bulk_events_collection_endpoint_contains_all_data(key_auth, logger, owner, repo):
Expand All @@ -33,7 +43,7 @@
else:
collection_strategy = ThoroughGithubEventCollection(logger)

collection_strategy.collect(repo_git, key_auth)

Check warning on line 46 in augur/tasks/github/events.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 E1120: No value for argument 'since' in method call (no-value-for-parameter) Raw Output: augur/tasks/github/events.py:46:4: E1120: No value for argument 'since' in method call (no-value-for-parameter)

def bulk_events_collection_endpoint_contains_all_data(key_auth, logger, owner, repo):

Expand All @@ -60,7 +70,7 @@
self._data_source = "Github API"

@abstractmethod
def collect(self, repo_git, key_auth):
def collect(self, repo_git, key_auth, since):
pass

def _insert_issue_events(self, events):
Expand Down Expand Up @@ -97,7 +107,7 @@

super().__init__(logger)

def collect(self, repo_git, key_auth):
def collect(self, repo_git, key_auth, since):

repo_obj = get_repo_by_repo_git(repo_git)
repo_id = repo_obj.repo_id
Expand All @@ -106,7 +116,7 @@
self.repo_identifier = f"{owner}/{repo}"

events = []
for event in self._collect_events(repo_git, key_auth):
for event in self._collect_events(repo_git, key_auth, since):
events.append(event)

# making this a decent size since process_events retrieves all the issues and prs each time
Expand All @@ -117,15 +127,21 @@
if events:
self._process_events(events, repo_id)

def _collect_events(self, repo_git: str, key_auth):
def _collect_events(self, repo_git: str, key_auth, since):

owner, repo = get_owner_repo(repo_git)

url = f"https://api.github.com/repos/{owner}/{repo}/issues/events"

github_data_access = GithubDataAccess(key_auth, self._logger)

return github_data_access.paginate_resource(url)
for event in github_data_access.paginate_resource(url):

yield event

# return if last event on the page was updated before the since date
if since and datetime.fromisoformat(event["created_at"].replace("Z", "+00:00")).replace(tzinfo=timezone.utc) < since:
return

def _process_events(self, events, repo_id):

Expand Down Expand Up @@ -248,26 +264,30 @@
def __init__(self, logger):
super().__init__(logger)

def collect(self, repo_git, key_auth):
def collect(self, repo_git, key_auth, since):

repo_obj = get_repo_by_repo_git(repo_git)
repo_id = repo_obj.repo_id

owner, repo = get_owner_repo(repo_git)
self.repo_identifier = f"{owner}/{repo}"

self._collect_and_process_issue_events(owner, repo, repo_id, key_auth)
self._collect_and_process_pr_events(owner, repo, repo_id, key_auth)
self._collect_and_process_issue_events(owner, repo, repo_id, key_auth, since)
self._collect_and_process_pr_events(owner, repo, repo_id, key_auth, since)

def _collect_and_process_issue_events(self, owner, repo, repo_id, key_auth):
def _collect_and_process_issue_events(self, owner, repo, repo_id, key_auth, since):

engine = get_engine()

with engine.connect() as connection:

# TODO: Remove src id if it ends up not being needed
query = text(f"""
select issue_id as issue_id, gh_issue_number as issue_number, gh_issue_id as gh_src_id from issues WHERE repo_id={repo_id} order by created_at desc;
select issue_id as issue_id, gh_issue_number as issue_number, gh_issue_id as gh_src_id
from issues
where repo_id={repo_id}
and updated_at > timestamptz(timestamp '{since}')
order by created_at desc;
""")

issue_result = connection.execute(query).fetchall()
Expand Down Expand Up @@ -309,14 +329,18 @@
events.clear()


def _collect_and_process_pr_events(self, owner, repo, repo_id, key_auth):
def _collect_and_process_pr_events(self, owner, repo, repo_id, key_auth, since):

engine = get_engine()

with engine.connect() as connection:

query = text(f"""
select pull_request_id, pr_src_number as gh_pr_number, pr_src_id from pull_requests WHERE repo_id={repo_id} order by pr_created_at desc;
select pull_request_id, pr_src_number as gh_pr_number, pr_src_id
from pull_requests
where repo_id={repo_id}
and pr_updated_at > timestamptz(timestamp '{since}')
order by pr_created_at desc;
""")

pr_result = connection.execute(query).fetchall()
Expand Down
2 changes: 1 addition & 1 deletion augur/tasks/start_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@

#Define secondary group that can't run until after primary jobs have finished.
secondary_repo_jobs = group(
collect_events.si(repo_git),#*create_grouped_task_load(dataList=first_pass, task=collect_events).tasks,
collect_events.si(repo_git, full_collection),#*create_grouped_task_load(dataList=first_pass, task=collect_events).tasks,
collect_github_messages.si(repo_git, full_collection), #*create_grouped_task_load(dataList=first_pass,task=collect_github_messages).tasks,
collect_github_repo_clones_data.si(repo_git),
)
Expand Down Expand Up @@ -295,7 +295,7 @@
status = repo.collection_status[0]
raw_count = status.issue_pr_sum

issue_pr_task_update_weight_util([int(raw_count)],repo_git=repo_git,session=session)

Check warning on line 298 in augur/tasks/start_tasks.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 E1120: No value for argument 'issue_and_pr_nums' in function call (no-value-for-parameter) Raw Output: augur/tasks/start_tasks.py:298:12: E1120: No value for argument 'issue_and_pr_nums' in function call (no-value-for-parameter)

facade_not_pending = CollectionStatus.facade_status != CollectionState.PENDING.value
facade_not_failed = CollectionStatus.facade_status != CollectionState.FAILED_CLONE.value
Expand Down
Loading