diff --git a/.circleci/config.yml b/.circleci/config.yml index b9171ce69..7358057ab 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -180,6 +180,51 @@ jobs: gsutil rsync -d -r dataproc_bootstrap gs://moz-fx-data-prod-airflow-dataproc-artifacts/bootstrap gsutil rsync -d -r jobs gs://moz-fx-data-prod-airflow-dataproc-artifacts/jobs + sync-dags-repo: + executor: docker/machine + steps: + - checkout + - add_ssh_keys: + fingerprints: + - "e4:30:50:41:53:f0:d6:3a:bb:c9:38:54:2d:ca:56:41" + - run: + name: πŸ€– Update DAGs repository + command: | + ssh-keyscan -t rsa github.com >> ~/.ssh/known_hosts + git config --global user.email "dataops@mozilla.com" + git config --global user.name "CircleCI Job" + git clone --branch main git@github.com:mozilla/telemetry-airflow-dags.git + cd ~/project/telemetry-airflow-dags + git submodule update --init --recursive --depth 1 telemetry-airflow + cd ${CIRCLE_PROJECT_REPONAME} + git pull origin dags + cd .. + git add ${CIRCLE_PROJECT_REPONAME} + git commit --allow-empty -m "Automatic commit from ${CIRCLE_PROJECT_REPONAME} commit ${CIRCLE_SHA1:0:9} build ${CIRCLE_BUILD_NUM} [skip ci]" + git push origin main + + sync-dags-branch: + executor: docker/machine + steps: + - checkout + - add_ssh_keys: + fingerprints: + - "66:9a:9f:55:2b:5c:f5:d6:ea:c5:c4:f9:7b:db:8d:c0" + - run: + name: πŸ€– Update DAGs branch + command: | + ssh-keyscan -t rsa github.com >> ~/.ssh/known_hosts + git config --global user.name "CircleCI job" + git config --global user.email "dataops@mozilla.com" + git clone --branch dags git@github.com:mozilla/telemetry-airflow.git \ + dags-branch + cd dags-branch/ + rm -rf dags/ + cp -r ~/project/dags dags + git add . + git commit -m "Automatic commit - DAGs from commit ${CIRCLE_SHA1:0:9} build ${CIRCLE_BUILD_NUM} [skip ci]" \ + && git push \ + || echo "Skipping push since it looks like there were no changes" workflows: ci: @@ -297,3 +342,18 @@ workflows: only: /.*/ branches: only: main + + - sync-dags-branch: + name: πŸ”ƒ Update DAGs branch + filters: + branches: + only: main + + - sync-dags-repo: + name: πŸ”ƒ Synchronize telemetry-airflow-dags repository + filters: + branches: + only: main + requires: + - πŸ”ƒ Update DAGs branch + diff --git a/README.md b/README.md index 19a67ca96..eaabb86b2 100644 --- a/README.md +++ b/README.md @@ -25,21 +25,65 @@ Some links relevant to users and developers of WTMO: ## Writing DAGs See the Airflow's [Best Practices guide](https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#best-practices) to help you write DAGs. -**⚠ Note: How to import DAGs and modules ⚠** +### **⚠ Warning: Do not import resources from the `dags` directory in DAGs definition files ⚠** +As an example, if you have `dags/dag_a.py` and `dags/dag_b.py` and want to use a helper +function in both DAG definition files, define the helper function in the `utils` directory +such as: + +#### `utils/helper.py` +```python +def helper_function(): + return "Help" +``` + +#### `dags/dag_a.py` +```python +from airflow import DAG + +from utils.helper import helper_function + +with DAG("dag_a", ...): + ... +``` -**Modules should be imported from the project directory, such as `from dags.my_dag import load_data` -rather than `from my_dag import load_data`.** +#### `dags/dag_b.py` +```python +from airflow import DAG -In Airflow, the `dags`, `config`, and `plugins` folders are [automatically added](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/modules_management.html#built-in-pythonpath-entries-in-airflow) to the -`PYTHONPATH` to ensure they can be imported and accessed by Airflow's execution environment. +from utils.helper import helper_function + +with DAG("dag_b", ...): + ... +``` + +WTMO deployments use git-sync sidecars to synchronize DAG files from multiple +repositories via [telemetry-airflow-dags](https://github.com/mozilla/telemetry-airflow-dags/) +using git submodules. Git-sync sidecar pattern results in the following directory structure +once deployed. +``` +airflow +β”œβ”€ dags +β”‚ └── repo +β”‚ └── telemetry-airflow-dags +β”‚ β”œβ”€β”€ +β”‚ β”‚ └── dags +β”‚ β”‚ └── +β”‚ β”œβ”€β”€ +β”‚ β”‚ └── dags +β”‚ β”‚ └── +β”‚ └── +β”‚ └── dags +β”‚ └── +β”œβ”€ utils +β”‚ └── ... +└─ plugins + └── ... +``` +Hence, defining `helper_function()` in `dags/dag_a.py` and +importing the function in `dags/dag_b.py` as `from dags.dag_a import helper_function` +**will not work after deployment** because of the directory structured required for +git-sync sidecars. -However, this default configuration can cause problems when running unit tests located -in the `tests` directory. Since the `PYTHONPATH` includes the `dags` directory, -but not the project directory itself, the unit tests will not be able to import code from -the `dags` directory. This limitation restricts the ability to test the DAGs effectively -within the project structure. It is also generally expected that imports should work from the -project directory rather than from any of its subdirectories. For this reason, `telemetry-airflow`'s -Dockerfile adds the project directory to `PYTHONPATH`. ## Prerequisites diff --git a/dags/adm_export.py b/dags/adm_export.py index 5affd295f..dd8fd1b14 100644 --- a/dags/adm_export.py +++ b/dags/adm_export.py @@ -3,6 +3,7 @@ from airflow import DAG from airflow.hooks.base import BaseHook from airflow.sensors.external_task import ExternalTaskSensor + from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES from utils.tags import Tag diff --git a/dags/bhr_collection.py b/dags/bhr_collection.py index 1a1261e39..8d7f3f971 100644 --- a/dags/bhr_collection.py +++ b/dags/bhr_collection.py @@ -21,6 +21,7 @@ from airflow.operators.subdag import SubDagOperator from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook from airflow.sensors.external_task import ExternalTaskSensor + from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES from utils.dataproc import get_dataproc_parameters, moz_dataproc_pyspark_runner diff --git a/dags/burnham.py b/dags/burnham.py index f532a8755..b3c9dcec8 100644 --- a/dags/burnham.py +++ b/dags/burnham.py @@ -6,13 +6,13 @@ import datetime import json import logging -import uuid import time - +import uuid from dataclasses import dataclass from airflow import DAG from airflow.operators.python import PythonOperator + from operators.bq_sensor import BigQuerySQLSensorOperator from operators.gcp_container_operator import GKEPodOperator from utils.tags import Tag @@ -431,7 +431,8 @@ def burnham_run( gke_namespace=DEFAULT_GKE_NAMESPACE, **kwargs, ): - """Create a new GKEPodOperator that runs the burnham Docker image. + """ + Create a new GKEPodOperator that runs the burnham Docker image. :param str task_id: [Required] ID for the task :param str burnham_test_run: [Required] UUID for the test run @@ -482,7 +483,8 @@ def burnham_run( def burnham_sensor(task_id, sql, gcp_conn_id=DEFAULT_GCP_CONN_ID, **kwargs): - """Create a new BigQuerySQLSensorOperator that checks for burnham data. + """ + Create a new BigQuerySQLSensorOperator that checks for burnham data. :param str task_id: [Required] ID for the task :param str sql: [Required] SQL for the sensor @@ -512,7 +514,8 @@ def burnham_bigquery_run( gke_namespace=DEFAULT_GKE_NAMESPACE, **kwargs, ): - """Create a new GKEPodOperator that runs the burnham-bigquery Docker image. + """ + Create a new GKEPodOperator that runs the burnham-bigquery Docker image. :param str task_id: [Required] ID for the task :param str project_id: [Required] Project ID where target table lives @@ -558,7 +561,8 @@ def burnham_bigquery_run( def encode_test_scenarios(test_scenarios): - """Encode the given test scenarios as a str. + """ + Encode the given test scenarios as a str. :param List[Dict[str, object]] test_scenarios: [Required] ID for the task :return: str @@ -570,7 +574,8 @@ def encode_test_scenarios(test_scenarios): def do_sleep(minutes): - """Sleep for the given number of minutes. + """ + Sleep for the given number of minutes. Writes out an update every minute to give some indication of aliveness. """ @@ -581,7 +586,8 @@ def do_sleep(minutes): def sleep_task(minutes, task_id): - """Return an operator that sleeps for a certain number of minutes. + """ + Return an operator that sleeps for a certain number of minutes. :param int minutes: [Required] Number of minutes to sleep :param string task_id: [Required] ID for the task @@ -592,7 +598,7 @@ def sleep_task(minutes, task_id): task_id=task_id, depends_on_past=False, python_callable=do_sleep, - op_kwargs=dict(minutes=minutes), + op_kwargs={"minutes": minutes}, ) @@ -605,7 +611,6 @@ def sleep_task(minutes, task_id): doc_md=DOCS, tags=tags, ) as dag: - # Generate a UUID for this test run generate_burnham_test_run_uuid = PythonOperator( task_id="generate_burnham_test_run_uuid", diff --git a/dags/clean_gke_pods.py b/dags/clean_gke_pods.py index 1076d0b24..8e6078519 100644 --- a/dags/clean_gke_pods.py +++ b/dags/clean_gke_pods.py @@ -1,6 +1,7 @@ from datetime import datetime, timedelta from airflow import DAG + from operators.gcp_container_operator import GKEPodOperator from utils.tags import Tag diff --git a/dags/crash_symbolication.py b/dags/crash_symbolication.py index 3848c7ae7..c1ba6168c 100644 --- a/dags/crash_symbolication.py +++ b/dags/crash_symbolication.py @@ -11,6 +11,7 @@ from airflow.operators.subdag import SubDagOperator from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook from airflow.sensors.external_task import ExternalTaskSensor + from utils.constants import ALLOWED_STATES, FAILED_STATES from utils.dataproc import get_dataproc_parameters, moz_dataproc_pyspark_runner from utils.tags import Tag diff --git a/dags/data_monitoring.py b/dags/data_monitoring.py index d1a46e85c..62a6d3573 100644 --- a/dags/data_monitoring.py +++ b/dags/data_monitoring.py @@ -1,9 +1,9 @@ from datetime import datetime from airflow import DAG -from operators.gcp_container_operator import GKEPodOperator -from dags.utils.tags import Tag +from operators.gcp_container_operator import GKEPodOperator +from utils.tags import Tag DOCS = """\ This DAG is related to data monitoring project it is still under development. @@ -57,7 +57,6 @@ tags=TAGS, catchup=True, ) as dag: - for target_dataset in TARGET_DATASETS: project_id, dataset, table = target_dataset.split(".") diff --git a/dags/dim_active_users_aggregates.py b/dags/dim_active_users_aggregates.py index 90ab8b554..ff88e8073 100644 --- a/dags/dim_active_users_aggregates.py +++ b/dags/dim_active_users_aggregates.py @@ -15,6 +15,7 @@ from airflow import DAG from airflow.operators.empty import EmptyOperator from airflow.sensors.external_task import ExternalTaskSensor + from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES from utils.tags import Tag diff --git a/dags/experiment_auto_sizing.py b/dags/experiment_auto_sizing.py index f1907d3cf..d367589e6 100644 --- a/dags/experiment_auto_sizing.py +++ b/dags/experiment_auto_sizing.py @@ -10,6 +10,7 @@ from airflow import DAG from airflow.sensors.external_task import ExternalTaskSensor + from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES from utils.tags import Tag @@ -34,7 +35,6 @@ doc_md=__doc__, tags=tags, ) as dag: - # Built from repo https://github.com/mozilla/auto-sizing auto_sizing_image = "gcr.io/moz-fx-data-experiments/auto_sizing:latest" diff --git a/dags/firefox_public_data_report.py b/dags/firefox_public_data_report.py index 276fadac6..c2ecbac7a 100644 --- a/dags/firefox_public_data_report.py +++ b/dags/firefox_public_data_report.py @@ -10,6 +10,7 @@ from airflow import DAG from airflow.operators.subdag import SubDagOperator from airflow.sensors.external_task import ExternalTaskSensor + from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES from utils.dataproc import ( diff --git a/dags/fivetran_acoustic.py b/dags/fivetran_acoustic.py index c05f9617f..bda7bfd83 100644 --- a/dags/fivetran_acoustic.py +++ b/dags/fivetran_acoustic.py @@ -1,5 +1,5 @@ from datetime import datetime, timedelta -from typing import Any, Dict +from typing import Any from airflow import DAG from airflow.hooks.base import BaseHook @@ -8,13 +8,13 @@ from fivetran_provider.operators.fivetran import FivetranOperator from fivetran_provider.sensors.fivetran import FivetranSensor -from dags.utils.acoustic.acoustic_client import AcousticClient -from dags.utils.callbacks import retry_tasks_callback -from dags.utils.tags import Tag +from utils.acoustic.acoustic_client import AcousticClient +from utils.callbacks import retry_tasks_callback +from utils.tags import Tag def _generate_acoustic_report( - conn_id: str, report_type: str, config: Dict[Any, Any], *args, **kwargs + conn_id: str, report_type: str, config: dict[Any, Any], *args, **kwargs ): """Retrieve Acoustic connection details from Airflow, instantiate AcousticClient and generate report.""" diff --git a/dags/glam.py b/dags/glam.py index a37403ab5..3d202dd7c 100644 --- a/dags/glam.py +++ b/dags/glam.py @@ -14,12 +14,13 @@ from airflow.operators.subdag import SubDagOperator from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor from airflow.utils.task_group import TaskGroup -from glam_subdags.extract import extract_user_counts, extracts_subdag -from glam_subdags.general import repeated_subdag -from glam_subdags.generate_query import generate_and_run_desktop_query -from glam_subdags.histograms import histogram_aggregates_subdag + from utils.constants import ALLOWED_STATES, FAILED_STATES from utils.gcp import bigquery_etl_query, gke_command +from utils.glam_subdags.extract import extract_user_counts, extracts_subdag +from utils.glam_subdags.general import repeated_subdag +from utils.glam_subdags.generate_query import generate_and_run_desktop_query +from utils.glam_subdags.histograms import histogram_aggregates_subdag from utils.tags import Tag project_id = "moz-fx-data-shared-prod" diff --git a/dags/glam_dev.py b/dags/glam_dev.py index 1f9c53efb..af4232810 100644 --- a/dags/glam_dev.py +++ b/dags/glam_dev.py @@ -14,11 +14,12 @@ from airflow import DAG from airflow.operators.subdag import SubDagOperator -from glam_subdags.general import repeated_subdag -from glam_subdags.generate_query import generate_and_run_desktop_query -from glam_subdags.histograms import histogram_aggregates_subdag -from glam_subdags.probe_hotlist import update_hotlist + from utils.gcp import bigquery_etl_query, gke_command +from utils.glam_subdags.general import repeated_subdag +from utils.glam_subdags.generate_query import generate_and_run_desktop_query +from utils.glam_subdags.histograms import histogram_aggregates_subdag +from utils.glam_subdags.probe_hotlist import update_hotlist from utils.tags import Tag prod_project_id = "moz-fx-data-shared-prod" diff --git a/dags/glam_fenix.py b/dags/glam_fenix.py index 74a736be6..085f6a6fc 100644 --- a/dags/glam_fenix.py +++ b/dags/glam_fenix.py @@ -1,5 +1,6 @@ """ -Firefox for Android ETL for https://glam.telemetry.mozilla.org/ +Firefox for Android ETL for https://glam.telemetry.mozilla.org/. + Generates and runs a series of BQ queries, see [bigquery_etl/glam](https://github.com/mozilla/bigquery-etl/tree/main/bigquery_etl/glam) in bigquery-etl and the @@ -8,20 +9,19 @@ """ from datetime import datetime, timedelta +from functools import partial from airflow import DAG from airflow.operators.empty import EmptyOperator -from airflow.sensors.external_task import ExternalTaskMarker -from airflow.sensors.external_task import ExternalTaskSensor +from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor from airflow.utils.task_group import TaskGroup -from glam_subdags.generate_query import ( +from utils.constants import ALLOWED_STATES, FAILED_STATES +from utils.gcp import gke_command +from utils.glam_subdags.generate_query import ( generate_and_run_glean_queries, generate_and_run_glean_task, ) -from utils.constants import ALLOWED_STATES, FAILED_STATES -from utils.gcp import gke_command -from functools import partial from utils.tags import Tag default_args = { @@ -93,10 +93,10 @@ ) pre_import = EmptyOperator( - task_id='pre_import', + task_id="pre_import", ) - with TaskGroup('glam_fenix_external') as glam_fenix_external: + with TaskGroup("glam_fenix_external") as glam_fenix_external: ExternalTaskMarker( task_id="glam_glean_imports__wait_for_fenix", external_dag_id="glam_glean_imports", @@ -112,8 +112,7 @@ task_id=f"daily_{product}", product=product, destination_project_id=PROJECT, - env_vars=dict(STAGE="daily"), - + env_vars={"STAGE": "daily"}, ) mapping[product] = query wait_for_copy_deduplicate >> query @@ -127,12 +126,12 @@ generate_and_run_glean_task, product=product, destination_project_id=PROJECT, - env_vars=dict(STAGE="incremental"), - + env_vars={"STAGE": "incremental"}, + ) + view, init, query = ( + partial(func, task_type=task_type) + for task_type in ["view", "init", "query"] ) - view, init, query = [ - partial(func, task_type=task_type) for task_type in ["view", "init", "query"] - ] # stage 1 - incremental clients_daily_histogram_aggregates = view( @@ -162,8 +161,12 @@ scalar_probe_counts = query(task_name=f"{product}__scalar_probe_counts_v1") scalar_percentile = query(task_name=f"{product}__scalar_percentiles_v1") - histogram_bucket_counts = query(task_name=f"{product}__histogram_bucket_counts_v1") - histogram_probe_counts = query(task_name=f"{product}__histogram_probe_counts_v1") + histogram_bucket_counts = query( + task_name=f"{product}__histogram_bucket_counts_v1" + ) + histogram_probe_counts = query( + task_name=f"{product}__histogram_probe_counts_v1" + ) histogram_percentiles = query(task_name=f"{product}__histogram_percentiles_v1") probe_counts = view(task_name=f"{product}__view_probe_counts_v1") @@ -186,7 +189,6 @@ command=["script/glam/export_csv"], docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest", gcp_conn_id="google_cloud_airflow_gke", - ) # set all of the dependencies for all of the tasks @@ -228,5 +230,11 @@ >> probe_counts ) probe_counts >> sample_counts >> extract_probe_counts >> export >> pre_import - clients_scalar_aggregate >> user_counts >> extract_user_counts >> export >> pre_import + ( + clients_scalar_aggregate + >> user_counts + >> extract_user_counts + >> export + >> pre_import + ) clients_histogram_aggregate >> export >> pre_import diff --git a/dags/glam_fog.py b/dags/glam_fog.py index 6482f6d17..87d50d5d1 100644 --- a/dags/glam_fog.py +++ b/dags/glam_fog.py @@ -5,12 +5,13 @@ from airflow.operators.empty import EmptyOperator from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor from airflow.utils.task_group import TaskGroup -from glam_subdags.generate_query import ( + +from utils.constants import ALLOWED_STATES, FAILED_STATES +from utils.gcp import gke_command +from utils.glam_subdags.generate_query import ( generate_and_run_glean_queries, generate_and_run_glean_task, ) -from utils.constants import ALLOWED_STATES, FAILED_STATES -from utils.gcp import gke_command from utils.tags import Tag default_args = { diff --git a/dags/glam_glean_imports.py b/dags/glam_glean_imports.py index d78f6ce49..6bad0424a 100644 --- a/dags/glam_glean_imports.py +++ b/dags/glam_glean_imports.py @@ -6,6 +6,7 @@ from airflow.models import Variable from airflow.sensors.external_task import ExternalTaskSensor from airflow.utils.task_group import TaskGroup + from operators.gcp_container_operator import GKENatPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES from utils.tags import Tag diff --git a/dags/jetstream.py b/dags/jetstream.py index 90b73de01..3170d6b26 100644 --- a/dags/jetstream.py +++ b/dags/jetstream.py @@ -16,6 +16,7 @@ from airflow import DAG from airflow.sensors.external_task import ExternalTaskSensor + from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES from utils.tags import Tag @@ -43,7 +44,6 @@ doc_md=__doc__, tags=tags, ) as dag: - # Built from repo https://github.com/mozilla/jetstream jetstream_image = "gcr.io/moz-fx-data-experiments/jetstream:latest" diff --git a/dags/merino_jobs.py b/dags/merino_jobs.py index b8cdbe485..a55f09964 100644 --- a/dags/merino_jobs.py +++ b/dags/merino_jobs.py @@ -1,9 +1,10 @@ import datetime -from typing import Any, Dict, List, Optional +from typing import Any from airflow import DAG from airflow.hooks.base import BaseHook from airflow.operators.email import EmailOperator + from operators.gcp_container_operator import GKEPodOperator from utils.tags import Tag @@ -16,7 +17,7 @@ def merino_job( - name: str, arguments: List[str], env_vars: Optional[Dict[str, Any]] = None, **kwargs + name: str, arguments: list[str], env_vars: dict[str, Any] | None = None, **kwargs ): default_env_vars = {"MERINO_ENV": "production"} if env_vars is None: diff --git a/dags/operational_monitoring.py b/dags/operational_monitoring.py index 7454ba5ee..4e0e857f2 100644 --- a/dags/operational_monitoring.py +++ b/dags/operational_monitoring.py @@ -1,4 +1,4 @@ -from datetime import timedelta, datetime +from datetime import datetime, timedelta from airflow import DAG from airflow.sensors.external_task import ExternalTaskSensor @@ -11,7 +11,7 @@ This DAG schedules queries for populating datasets used for operational monitoring. -The queries are generated via [`opmon`](https://github.com/mozilla/opmon). +The queries are generated via [`opmon`](https://github.com/mozilla/opmon). #### Owner @@ -52,7 +52,7 @@ email=["ascholtz@mozilla.com"], arguments=[ "--log_to_bigquery", - "run", + "run", "--date={{ ds }}", ], dag=dag, @@ -64,8 +64,8 @@ external_task_id="telemetry_derived__clients_daily__v6", execution_delta=timedelta(hours=2), mode="reschedule", - allowed_states=['success'], - failed_states=['failed', 'upstream_failed', 'skipped'], + allowed_states=["success"], + failed_states=["failed", "upstream_failed", "skipped"], pool="DATA_ENG_EXTERNALTASKSENSOR", email_on_retry=False, dag=dag, @@ -77,8 +77,8 @@ external_task_id="search_derived__search_clients_daily__v8", execution_delta=timedelta(hours=1), mode="reschedule", - allowed_states=['success'], - failed_states=['failed', 'upstream_failed', 'skipped'], + allowed_states=["success"], + failed_states=["failed", "upstream_failed", "skipped"], pool="DATA_ENG_EXTERNALTASKSENSOR", email_on_retry=False, dag=dag, diff --git a/dags/overwatch.py b/dags/overwatch.py index 1b196e6e0..c9b179772 100644 --- a/dags/overwatch.py +++ b/dags/overwatch.py @@ -13,6 +13,7 @@ from datetime import datetime from airflow import DAG + from operators.gcp_container_operator import GKEPodOperator default_args = { diff --git a/dags/partybal.py b/dags/partybal.py index 847ed3d96..7d2d540df 100644 --- a/dags/partybal.py +++ b/dags/partybal.py @@ -16,6 +16,7 @@ from datetime import datetime, timedelta from airflow import DAG + from operators.gcp_container_operator import GKEPodOperator from utils.tags import Tag diff --git a/dags/probe_scraper.py b/dags/probe_scraper.py index a921aaee9..9eb62d92c 100644 --- a/dags/probe_scraper.py +++ b/dags/probe_scraper.py @@ -10,6 +10,7 @@ from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook from airflow.providers.http.operators.http import SimpleHttpOperator from airflow.utils.weekday import WeekDay + from operators.gcp_container_operator import GKEPodOperator from utils.tags import Tag diff --git a/dags/search_term_data_validation_v2.py b/dags/search_term_data_validation_v2.py index 266e9d20a..1c5d16583 100644 --- a/dags/search_term_data_validation_v2.py +++ b/dags/search_term_data_validation_v2.py @@ -11,6 +11,7 @@ from datetime import datetime, timedelta from airflow import DAG + from utils.gcp import gke_command from utils.tags import Tag diff --git a/dags/socorro_import.py b/dags/socorro_import.py index 30dc2964a..46eee2237 100644 --- a/dags/socorro_import.py +++ b/dags/socorro_import.py @@ -1,14 +1,13 @@ +from datetime import datetime, timedelta + from airflow import DAG from airflow.operators.subdag import SubDagOperator - from airflow.providers.google.cloud.operators.cloud_storage_transfer_service import ( CloudDataTransferServiceS3ToGCSOperator, ) from airflow.sensors.external_task import ExternalTaskMarker from airflow.utils.task_group import TaskGroup -from datetime import datetime, timedelta - from operators.gcp_container_operator import GKEPodOperator from utils.dataproc import moz_dataproc_pyspark_runner from utils.tags import Tag @@ -47,8 +46,12 @@ tags = [Tag.ImpactTier.tier_2] -with DAG("socorro_import", default_args=default_args, schedule_interval="@daily", tags=tags,) as dag: - +with DAG( + "socorro_import", + default_args=default_args, + schedule_interval="@daily", + tags=tags, +) as dag: # Unsalted cluster name so subsequent runs fail if the cluster name exists cluster_name = "socorro-import-dataproc-cluster" @@ -101,9 +104,9 @@ "--date", "{{ ds_nodash }}", "--source-gcs-path", - "gs://{}/v1/crash_report".format(gcs_data_bucket), + f"gs://{gcs_data_bucket}/v1/crash_report", "--dest-gcs-path", - "gs://{}/{}".format(gcs_data_bucket, dataset), + f"gs://{gcs_data_bucket}/{dataset}", ], idle_delete_ttl=14400, num_workers=8, @@ -113,7 +116,6 @@ ), ) - bq_gcp_conn_id = "google_cloud_airflow_gke" dest_s3_key = "s3://telemetry-parquet" @@ -121,7 +123,7 @@ # Not using load_to_bigquery since our source data is on GCS. # We do use the parquet2bigquery container to load gcs parquet into bq though. bq_dataset = "telemetry_derived" - bq_table_name = "{}_{}".format(dataset, dataset_version) + bq_table_name = f"{dataset}_{dataset_version}" docker_image = "docker.io/mozilla/parquet2bigquery:20190722" @@ -141,14 +143,15 @@ # We remove the current date partition for idempotency. table_name = "{}:{}.{}${{{{ds_nodash}}}}".format( - "{{ var.value.gcp_shared_prod_project }}", bq_dataset, bq_table_name) + "{{ var.value.gcp_shared_prod_project }}", bq_dataset, bq_table_name + ) remove_bq_table_partition = GKEPodOperator( task_id="remove_socorro_crash_bq_table_partition", gcp_conn_id=bq_gcp_conn_id, name="remove_socorro_crash_bq_table_partition", image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest", - arguments=["bq", "rm", "-f", "--table", table_name] + arguments=["bq", "rm", "-f", "--table", table_name], ) bq_load = GKEPodOperator( @@ -160,7 +163,7 @@ env_vars={"GOOGLE_CLOUD_PROJECT": "{{ var.value.gcp_shared_prod_project }}"}, ) - with TaskGroup('socorro_external') as socorro_external: + with TaskGroup("socorro_external") as socorro_external: ExternalTaskMarker( task_id="crash_symbolication__wait_for_socorro_import", external_dag_id="crash_symbolication", diff --git a/dags/taar_daily.py b/dags/taar_daily.py index 8f2cd6442..36706508c 100644 --- a/dags/taar_daily.py +++ b/dags/taar_daily.py @@ -20,6 +20,7 @@ from airflow.models import Variable from airflow.operators.subdag import SubDagOperator from airflow.sensors.external_task import ExternalTaskSensor + from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES from utils.dataproc import ( diff --git a/dags/taar_weekly.py b/dags/taar_weekly.py index 6bfec44f7..9edf7c452 100644 --- a/dags/taar_weekly.py +++ b/dags/taar_weekly.py @@ -1,5 +1,5 @@ """ -This configures a weekly DAG to run the TAAR Ensemble job off. +Configure a weekly DAG to run the TAAR Ensemble job off. For context, see https://github.com/mozilla/taar @@ -10,16 +10,16 @@ any actions for the past failed DAG runs. """ -from airflow import DAG from datetime import datetime, timedelta -from airflow.operators.subdag import SubDagOperator + +from airflow import DAG from airflow.models import Variable +from airflow.operators.subdag import SubDagOperator -from operators.gcp_container_operator import GKEPodOperator # noqa +from operators.gcp_container_operator import GKEPodOperator from utils.dataproc import moz_dataproc_pyspark_runner from utils.tags import Tag - taar_ensemble_cluster_name = "dataproc-taar-ensemble" taar_gcpdataproc_conn_id = "google_cloud_airflow_dataproc" taar_gcpdataproc_project_id = "airflow-dataproc" @@ -32,9 +32,7 @@ TAAR_DATAFLOW_SERVICE_ACCOUNT = Variable.get("taar_dataflow_service_account_email") # This uses a circleci built docker image from github.com/mozilla/taar_gcp_etl -TAAR_ETL_CONTAINER_IMAGE = ( - "gcr.io/moz-fx-data-airflow-prod-88e0/taar_gcp_etl:0.6.5" -) +TAAR_ETL_CONTAINER_IMAGE = "gcr.io/moz-fx-data-airflow-prod-88e0/taar_gcp_etl:0.6.5" DELETE_DAYS = 29 @@ -43,7 +41,7 @@ "email": [ "hwoo@mozilla.com", "epavlov@mozilla.com", - "telemetry-alerts@mozilla.com" + "telemetry-alerts@mozilla.com", ], "depends_on_past": False, "start_date": datetime(2020, 4, 4), @@ -56,7 +54,9 @@ tags = [Tag.ImpactTier.tier_2] taar_weekly = DAG( - "taar_weekly", default_args=default_args_weekly, schedule_interval="@weekly", + "taar_weekly", + default_args=default_args_weekly, + schedule_interval="@weekly", doc_md=__doc__, tags=tags, ) @@ -84,7 +84,7 @@ def taar_profile_common_args(): "--bigtable-instance-id=%s" % TAAR_BIGTABLE_INSTANCE_ID, "--sample-rate=1.0", "--subnetwork=%s" % TAAR_DATAFLOW_SUBNETWORK, - "--dataflow-service-account=%s" % TAAR_DATAFLOW_SERVICE_ACCOUNT + "--dataflow-service-account=%s" % TAAR_DATAFLOW_SERVICE_ACCOUNT, ] @@ -100,7 +100,7 @@ def taar_profile_common_args(): task_id="dump_bq_to_tmp_table", name="dump_bq_to_tmp_table", image=TAAR_ETL_CONTAINER_IMAGE, - arguments=taar_profile_common_args() + ["--fill-bq",], + arguments=[*taar_profile_common_args(), "--fill-bq"], dag=taar_weekly, ) @@ -108,7 +108,7 @@ def taar_profile_common_args(): task_id="extract_bq_tmp_to_gcs_avro", name="extract_bq_tmp_to_gcs_avro", image=TAAR_ETL_CONTAINER_IMAGE, - arguments=taar_profile_common_args() + ["--bq-to-gcs",], + arguments=[*taar_profile_common_args(), "--bq-to-gcs"], dag=taar_weekly, ) @@ -121,7 +121,7 @@ def taar_profile_common_args(): # Where the pod continues to run, but airflow loses its connection and sets the status to Failed # See: https://github.com/mozilla/telemetry-airflow/issues/844 get_logs=False, - arguments=taar_profile_common_args() + ["--gcs-to-bigtable",], + arguments=[*taar_profile_common_args(), "--gcs-to-bigtable"], dag=taar_weekly, ) @@ -129,9 +129,12 @@ def taar_profile_common_args(): task_id="delete_opt_out_users_from_bigtable", name="delete_opt_out_users_from_bigtable", image=TAAR_ETL_CONTAINER_IMAGE, - arguments=taar_profile_common_args() + ["--bigtable-delete-opt-out", - "--delete-opt-out-days=%s" % DELETE_DAYS], - dag=taar_weekly + arguments=[ + *taar_profile_common_args(), + "--bigtable-delete-opt-out", + "--delete-opt-out-days=%s" % DELETE_DAYS, + ], + dag=taar_weekly, ) wipe_gcs_bucket_cleanup = GKEPodOperator( @@ -146,7 +149,7 @@ def taar_profile_common_args(): task_id="wipe_bigquery_tmp_table", name="wipe_bigquery_tmp_table", image=TAAR_ETL_CONTAINER_IMAGE, - arguments=taar_profile_common_args() + ["--wipe-bigquery-tmp-table",], + arguments=[*taar_profile_common_args(), "--wipe-bigquery-tmp-table"], dag=taar_weekly, ) @@ -177,7 +180,7 @@ def taar_profile_common_args(): ], additional_metadata={ "PIP_PACKAGES": "mozilla-taar3==1.0.7 python-decouple==3.1 click==7.0 " - "google-cloud-storage==1.19.1" + "google-cloud-storage==1.19.1" }, optional_components=["ANACONDA", "JUPYTER"], py_args=[ @@ -201,11 +204,13 @@ def taar_profile_common_args(): ) -wipe_gcs_bucket >> \ -dump_bq_to_tmp_table >> \ -extract_bq_tmp_to_gcs_avro >> \ -dataflow_import_avro_to_bigtable >> \ -delete_optout >> \ -wipe_gcs_bucket_cleanup >> \ -wipe_bigquery_tmp_table >> \ -taar_ensemble +( + wipe_gcs_bucket + >> dump_bq_to_tmp_table + >> extract_bq_tmp_to_gcs_avro + >> dataflow_import_avro_to_bigtable + >> delete_optout + >> wipe_gcs_bucket_cleanup + >> wipe_bigquery_tmp_table + >> taar_ensemble +) diff --git a/dags/utils/constants.py b/dags/utils/constants.py deleted file mode 100644 index a8618d6db..000000000 --- a/dags/utils/constants.py +++ /dev/null @@ -1,11 +0,0 @@ -DS_WEEKLY = ( - '{% if dag_run.external_trigger %}' - '{{ ds_nodash }}' - '{% else %}' - '{{ macros.ds_format(macros.ds_add(ds, 6), "%Y-%m-%d", "%Y%m%d") }}' - '{% endif %}' -) - -FAILED_STATES = ['failed', 'upstream_failed', 'skipped'] - -ALLOWED_STATES = ['success'] diff --git a/docker-compose.yml b/docker-compose.yml index 20d2b55c4..a91144bdf 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -27,6 +27,7 @@ x-airflow-common: AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 AIRFLOW__CORE__FERNET_KEY: $FERNET_KEY + AIRFLOW__CORE__DAGS_FOLDER: "/opt/airflow/dags" AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__LOAD_EXAMPLES: 'false' AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.default' @@ -40,6 +41,8 @@ x-airflow-common: - ./dags:/opt/airflow/dags - ./logs:/opt/airflow/logs - ./plugins:/opt/airflow/plugins + - ./utils:/opt/airflow/utils + - ./operators:/opt/airflow/operators - ./resources/dev_webserver_config.py:/opt/airflow/webserver_config.py - ./resources/dev_connections.json:/opt/airflow/dev_connections.json - ./resources/dev_variables.json:/opt/airflow/dev_variables.json diff --git a/dags/glam_subdags/__init__.py b/operators/__init__.py similarity index 100% rename from dags/glam_subdags/__init__.py rename to operators/__init__.py diff --git a/dags/operators/bq_sensor.py b/operators/bq_sensor.py similarity index 100% rename from dags/operators/bq_sensor.py rename to operators/bq_sensor.py diff --git a/dags/operators/gcp_container_operator.py b/operators/gcp_container_operator.py similarity index 59% rename from dags/operators/gcp_container_operator.py rename to operators/gcp_container_operator.py index aaa24d3cd..c6939acbe 100644 --- a/dags/operators/gcp_container_operator.py +++ b/operators/gcp_container_operator.py @@ -1,48 +1,55 @@ -from airflow.providers.google.cloud.operators.kubernetes_engine import GKEStartPodOperator as UpstreamGKEPodOperator +from airflow.providers.google.cloud.operators.kubernetes_engine import ( + GKEStartPodOperator as UpstreamGKEPodOperator, +) class GKEPodOperator(UpstreamGKEPodOperator): """ + Based off GKEStartPodOperator. + - This will now have the same defaults as GkeNatPodOperator pointing to the newer GKE cluster - In 1.10.x this inherited from upstream GKEPodOperator, rather than GKEStartPodOperator(v2) - In 1.10.x we needed to override the execute and helper methods to set an environment -variable for authentication to work (CLOUDSDK_AUTH_CREDENTIAL_FILE_OVERRIDE). Fixed in v2 + variable for authentication to work (CLOUDSDK_AUTH_CREDENTIAL_FILE_OVERRIDE). Fixed in v2 - We will keep this class and call the upstream GkeStartPodOperator now, because -numerous places in our code references it still + numerous places in our code references it still - Overrides init to default image_pull_policy=Always, in_cluster=False, -do_xcom_push=False and GKE params + do_xcom_push=False and GKE params - Defaults reattach_on_restart=False to address a 1.10.12 regression where GkePodOperators reruns will simply attach to an existing pod and not perform any new work. - Hard sets reattach_on_restart=False when do_xcom_push=True to address an error Retrying a failed task with do_xcom_push=True causes airflow to reattach to the pod eventually causing a 'Handshake status 500 Internal Server Error'. Logs will indicate 'found a running pod with ... different try_number. Will attach to this pod and monitor - instead of starting new one' + instead of starting new one'. """ - def __init__(self, - image_pull_policy='Always', - in_cluster=False, - do_xcom_push=False, - reattach_on_restart=False, - # Defined in Airflow's UI -> Admin -> Connections - gcp_conn_id='google_cloud_airflow_gke', - project_id='moz-fx-data-airflow-gke-prod', - location='us-west1', - cluster_name='workloads-prod-v1', - namespace='default', - *args, - **kwargs): + def __init__( + self, + image_pull_policy="Always", + in_cluster=False, + do_xcom_push=False, + reattach_on_restart=False, + # Defined in Airflow's UI -> Admin -> Connections + gcp_conn_id="google_cloud_airflow_gke", + project_id="moz-fx-data-airflow-gke-prod", + location="us-west1", + cluster_name="workloads-prod-v1", + namespace="default", + *args, + **kwargs + ): # Hard set reattach_on_restart = False when do_xcom_push is enabled. if do_xcom_push: reattach_on_restart = False # GKE node pool autoscaling is failing to scale down when completed pods exist on the node # in Completed states, due to the pod not being replicated. E.g. behind an rc or deployment. - annotations = {'cluster-autoscaler.kubernetes.io/safe-to-evict': 'true'} + annotations = {"cluster-autoscaler.kubernetes.io/safe-to-evict": "true"} - super(GKEPodOperator, self).__init__( + super().__init__( + *args, image_pull_policy=image_pull_policy, in_cluster=in_cluster, do_xcom_push=do_xcom_push, @@ -53,38 +60,38 @@ def __init__(self, location=location, cluster_name=cluster_name, namespace=namespace, - *args, - **kwargs) + **kwargs + ) class GKENatPodOperator(UpstreamGKEPodOperator): - """ - This class is similar to the one defined above but has defaults to use a different GKE Cluster - With a NAT - """ - def __init__(self, - image_pull_policy='Always', - in_cluster=False, - do_xcom_push=False, - reattach_on_restart=False, - # Defined in Airflow's UI -> Admin -> Connections - gcp_conn_id='google_cloud_airflow_gke', - project_id='moz-fx-data-airflow-gke-prod', - location='us-west1', - cluster_name='workloads-prod-v1', - namespace='default', - *args, - **kwargs): + """Similar to the one defined above but has defaults to use a different GKE Cluster With a NAT.""" + def __init__( + self, + image_pull_policy="Always", + in_cluster=False, + do_xcom_push=False, + reattach_on_restart=False, + # Defined in Airflow's UI -> Admin -> Connections + gcp_conn_id="google_cloud_airflow_gke", + project_id="moz-fx-data-airflow-gke-prod", + location="us-west1", + cluster_name="workloads-prod-v1", + namespace="default", + *args, + **kwargs + ): # Hard set reattach_on_restart = False when do_xcom_push is enabled. if do_xcom_push: reattach_on_restart = False # GKE node pool autoscaling is failing to scale down when completed pods exist on the node # in Completed states, due to the pod not being replicated. E.g. behind an rc or deployment. - annotations = {'cluster-autoscaler.kubernetes.io/safe-to-evict': 'true'} + annotations = {"cluster-autoscaler.kubernetes.io/safe-to-evict": "true"} - super(GKENatPodOperator, self).__init__( + super().__init__( + *args, image_pull_policy=image_pull_policy, in_cluster=in_cluster, do_xcom_push=do_xcom_push, @@ -95,5 +102,5 @@ def __init__(self, location=location, cluster_name=cluster_name, namespace=namespace, - *args, - **kwargs) + **kwargs + ) diff --git a/plugins/backfill/main.py b/plugins/backfill/main.py index ad326b9b9..289f40049 100644 --- a/plugins/backfill/main.py +++ b/plugins/backfill/main.py @@ -1,60 +1,54 @@ -# -*- coding: utf-8 -*- # Modified from https://github.com/AnkurChoraywal/airflow-backfill-util.git # Inbuilt Imports -import os -import json -import logging import datetime +import json +import os import re -from typing import List # Custom Imports import flask from flask import request -from flask_admin import BaseView, expose -from flask_appbuilder import expose as app_builder_expose, BaseView as AppBuilderBaseView, has_access -from airflow import configuration - +from flask_admin import expose +from flask_appbuilder import BaseView as AppBuilderBaseView +from flask_appbuilder import expose as app_builder_expose from shelljob import proc -from dags.utils.backfill import BackfillParams + +from utils.backfill import BackfillParams # Inspired from # https://mortoray.com/2014/03/04/http-streaming-of-command-output-in-python-flask/ # https://www.endpoint.com/blog/2015/01/28/getting-realtime-output-using-python -# RBAC inspired from +# RBAC inspired from # https://github.com/teamclairvoyant/airflow-rest-api-plugin # Set your Airflow home path -if 'AIRFLOW_HOME' in os.environ: - airflow_home_path = os.environ['AIRFLOW_HOME'] -else: - airflow_home_path = '/tmp' +airflow_home_path = os.environ.get("AIRFLOW_HOME", "/tmp") # Local file where history will be stored -FILE = airflow_home_path + '/logs/backfill_history.txt' +FILE = airflow_home_path + "/logs/backfill_history.txt" # RE for remove ansi escape characters -ansi_escape = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]') +ansi_escape = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]") # Creating a flask admin BaseView def file_ops(mode, data=None): - """ File operators - logging/writing and reading user request """ - if mode == 'r': + """File operators - logging/writing and reading user request.""" + if mode == "r": try: - with open(FILE, 'r') as f: + with open(FILE) as f: return f.read() - except IOError: - with open(FILE, 'w') as f: + except OSError: + with open(FILE, "w") as f: return f.close() - elif mode == 'w' and data: + elif mode == "w" and data: today = datetime.datetime.now() print(os.getcwd()) - with open(FILE, 'a+') as f: - file_data = '{},{}\n'.format(data, today) + with open(FILE, "a+") as f: + file_data = f"{data},{today}\n" f.write(file_data) return 1 @@ -66,14 +60,14 @@ def get_baseview(): class Backfill(get_baseview()): route_base = "/admin/backfill/" - @app_builder_expose('/') + @app_builder_expose("/") def list(self): return self.render_template("backfill_page.html") - @expose('/stream') - @app_builder_expose('/stream') + @expose("/stream") + @app_builder_expose("/stream") def stream(self): - """ Runs user request and outputs console stream to client""" + """Run user request and outputs console stream to client.""" dag_name = request.args.get("dag_name") start_date = request.args.get("start_date") end_date = request.args.get("end_date") @@ -89,43 +83,44 @@ def stream(self): task_regex=task_regex, dag_name=dag_name, start_date=start_date, - end_date=end_date) + end_date=end_date, + ) cmd = backfill_params.generate_backfill_command() - print('BACKFILL CMD:', cmd) + print("BACKFILL CMD:", cmd) # updates command used in history - file_ops('w', ' '.join(cmd)) + file_ops("w", " ".join(cmd)) g = proc.Group() g.run(cmd) # To print out cleared dry run task instances - pattern = '^\<.*\>$' + pattern = r"^\<.*\>$" def read_process(): while g.is_pending(): lines = g.readlines() - for proc, line in lines: + for _proc, line in lines: line = line.decode("utf-8") result = re.match(pattern, line) if result: # Adhere to text/event-stream format - line = line.replace('<', '').replace('>', '') + line = line.replace("<", "").replace(">", "") elif clear and not dry_run: # Special case/hack, airflow tasks clear -y no longer outputs a termination string, so we put one line = "Clear Done" yield "data:" + line + "\n\n" - return flask.Response(read_process(), mimetype='text/event-stream') + return flask.Response(read_process(), mimetype="text/event-stream") - @expose('/background') - @app_builder_expose('/background') + @expose("/background") + @app_builder_expose("/background") def background(self): - """ Runs user request in background """ + """Run user request in background.""" dag_name = request.args.get("dag_name") start_date = request.args.get("start_date") end_date = request.args.get("end_date") @@ -134,32 +129,32 @@ def background(self): use_task_regex = request.args.get("use_task_regex") # create a screen id based on timestamp - screen_id = datetime.datetime.now().strftime('%s') + screen_id = datetime.datetime.now().strftime("%s") # Prepare the command and execute in background - background_cmd = "screen -dmS {} ".format(screen_id) - if clear == 'true': - background_cmd = background_cmd + 'airflow tasks clear -y ' - elif clear == 'false': - background_cmd = background_cmd + 'airflow dags backfill ' + background_cmd = f"screen -dmS {screen_id} " + if clear == "true": + background_cmd = background_cmd + "airflow tasks clear -y " + elif clear == "false": + background_cmd = background_cmd + "airflow dags backfill " - if use_task_regex == 'true': - background_cmd = background_cmd + "-t {} ".format(task_regex) + if use_task_regex == "true": + background_cmd = background_cmd + f"-t {task_regex} " - background_cmd = background_cmd + "-s {} -e {} {}".format(start_date, end_date, dag_name) + background_cmd = background_cmd + f"-s {start_date} -e {end_date} {dag_name}" # Update command in file - file_ops('w', background_cmd) + file_ops("w", background_cmd) print(background_cmd) os.system(background_cmd) - response = json.dumps({'submitted': True}) - return flask.Response(response, mimetype='text/json') + response = json.dumps({"submitted": True}) + return flask.Response(response, mimetype="text/json") - @expose('/history') - @app_builder_expose('/history') + @expose("/history") + @app_builder_expose("/history") def history(self): - """ Outputs recent user request history """ - return flask.Response(file_ops('r'), mimetype='text/txt') + """Output recent user request history.""" + return flask.Response(file_ops("r"), mimetype="text/txt") diff --git a/pyproject.toml b/pyproject.toml index 6a55a1531..e593760ac 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ known-third-party = ["airflow"] [tool.ruff] -target-version = "py38" +target-version = "py310" select = [ "E", # pycodestyle "W", # pycodestyle diff --git a/tests/utils/test_backfill.py b/tests/utils/test_backfill.py index 95bdedd18..ac87b34bb 100644 --- a/tests/utils/test_backfill.py +++ b/tests/utils/test_backfill.py @@ -1,6 +1,6 @@ import pytest -from dags.utils.backfill import BackfillParams +from utils.backfill import BackfillParams @pytest.fixture() diff --git a/tests/utils/test_tags.py b/tests/utils/test_tags.py index f259a7c6d..5a4de2953 100644 --- a/tests/utils/test_tags.py +++ b/tests/utils/test_tags.py @@ -1,28 +1,29 @@ import pytest -from dags.utils.tags import Tag, InvalidTagError + +from utils.tags import InvalidTagError, Tag @pytest.mark.parametrize( - "actual,expected", + ("actual", "expected"), [ (Tag.ImpactTier.tier_1, "impact/tier_1"), (Tag.ImpactTier.tier_2, "impact/tier_2"), (Tag.ImpactTier.tier_3, "impact/tier_3"), - ] + ], ) def test_valid_impact_tag(actual, expected): assert actual == expected @pytest.mark.parametrize( - "obj,attr,expected", + ("obj", "attr", "expected"), [ (Tag.ImpactTier, "tier_1", "impact/tier_1"), (Tag.ImpactTier, "tier_2", "impact/tier_2"), (Tag.ImpactTier, "tier_3", "impact/tier_3"), - ] + ], ) -def test_valid_impact_tag(obj, attr, expected): +def test_get_impact_tag(obj, attr, expected): assert getattr(obj, attr) == expected @@ -32,8 +33,8 @@ def test_valid_impact_tag(obj, attr, expected): "tier_4", "", "bq-etl", - ] + ], ) -def test_valid_impact_tag(invalid_input): +def test_invalid_impact_tag(invalid_input): with pytest.raises(InvalidTagError): getattr(Tag.ImpactTier, invalid_input) diff --git a/dags/operators/__init__.py b/utils/__init__.py similarity index 100% rename from dags/operators/__init__.py rename to utils/__init__.py diff --git a/dags/utils/__init__.py b/utils/acoustic/__init__.py similarity index 100% rename from dags/utils/__init__.py rename to utils/acoustic/__init__.py diff --git a/dags/utils/acoustic/acoustic_client.py b/utils/acoustic/acoustic_client.py similarity index 83% rename from dags/utils/acoustic/acoustic_client.py rename to utils/acoustic/acoustic_client.py index 28c1bb148..eec18517c 100644 --- a/dags/utils/acoustic/acoustic_client.py +++ b/utils/acoustic/acoustic_client.py @@ -1,15 +1,14 @@ """ -Module for authenticating into and interacting with Acoustic (XML) API +Module for authenticating into and interacting with Acoustic (XML) API. Acoustic API docs can be found here: https://developer.goacoustic.com/acoustic-campaign/reference/overview """ +import logging from datetime import datetime from time import sleep -import logging import requests - import xmltodict # type: ignore DATE_FORMAT = "%m/%d/%Y %H:%M:%S" @@ -22,9 +21,7 @@ def _request_wrapper(request_method, request_body): class AcousticClient: - """ - Acoustic Client object for authenticating into and interacting with Acoustic XML API. - """ + """Acoustic Client object for authenticating into and interacting with Acoustic XML API.""" DEFAULT_BASE_URL = "https://api-campaign-us-6.goacoustic.com" XML_API_ENDPOINT = "XMLAPI" @@ -34,10 +31,12 @@ def __init__( client_id: str, client_secret: str, refresh_token: str, - base_url: str = None, + base_url: str | None = None, **kwargs, ): """ + Initialize. + :param client_id: to provide the client identity :param client_secret: secret that it used to confirm identity to the API :param refresh_token: A long-lived value that the client store, @@ -54,12 +53,13 @@ def __init__( def _generate_access_token(self) -> str: """ - Responsible for contacting Acoustic XML API and generating an access_token using Oauth method + Responsible for contacting Acoustic XML API and generating an access_token using Oauth method. + to be used for interacting with the API and retrieving data in the following calls. :return: A short-lived token that can be generated based on the refresh token. A value that is ultimately passed to the API to prove that this client is authorized - to make API calls on the user’s behalf. + to make API calls on the user`s behalf. More info about Acoustic and Oauth: https://developer.goacoustic.com/acoustic-campaign/reference/overview#getting-started-with-oauth @@ -92,9 +92,9 @@ def _generate_access_token(self) -> str: return response.json()["access_token"] - def _is_job_complete(self, job_id: int, extra_info: str = None) -> bool: + def _is_job_complete(self, job_id: int, extra_info: str | None = None) -> bool: """ - Checks status of an Acoustic job to generate a report. + Check status of an Acoustic job to generate a report. :param job_id: Acoustic job id to check the progress status of @@ -128,19 +128,21 @@ def _is_job_complete(self, job_id: int, extra_info: str = None) -> bool: job_status = data["Envelope"]["Body"]["RESULT"]["JOB_STATUS"].lower() print( - "INFO: Current status for Acoustic job_id: %s is %s (%s)" - % (job_id, job_status, extra_info or "") + "INFO: Current status for Acoustic job_id: {} is {} ({})".format( + job_id, job_status, extra_info or "" + ) ) - return "complete" == job_status + return job_status == "complete" def generate_report( - self, request_template: str, template_params: dict, report_type: str, + self, + request_template: str, + template_params: dict, + report_type: str, ) -> str: """ - Extracts a listing of Acoustic Campaign emails that are sent for an organization - for a specified date range and provides metrics for those emails. - + Extract a listing of Acoustic Campaign emails that are sent for an organization for a specified date range and provides metrics for those emails. :param request_template: path to XML template file containing request body template to be used :param template_params: values that should be used to render the request template provided @@ -148,7 +150,10 @@ def generate_report( :return: Returns back a list of rows of data returned by the API call as a dictionary """ - supported_report_types = ("raw_recipient_export", "contact_export",) + supported_report_types = ( + "raw_recipient_export", + "contact_export", + ) if report_type not in supported_report_types: err_msg = f"{report_type} is not a valid option, supported types are: {supported_report_types}" raise AttributeError(err_msg) @@ -182,6 +187,8 @@ def generate_report( while not self._is_job_complete(job_id=job_id, extra_info=report_type): sleep(sleep_delay) - logging.info(f"{report_type} generation complete. Report location: {report_loc}. Time taken: {datetime.now() - start}") + logging.info( + f"{report_type} generation complete. Report location: {report_loc}. Time taken: {datetime.now() - start}" + ) return diff --git a/dags/utils/backfill.py b/utils/backfill.py similarity index 100% rename from dags/utils/backfill.py rename to utils/backfill.py diff --git a/dags/utils/callbacks.py b/utils/callbacks.py similarity index 65% rename from dags/utils/callbacks.py rename to utils/callbacks.py index 18180e522..25e65c5a0 100644 --- a/dags/utils/callbacks.py +++ b/utils/callbacks.py @@ -1,22 +1,25 @@ -from typing import Optional +from typing import TYPE_CHECKING -from airflow.models.dagrun import DagRun from airflow.models.taskinstance import clear_task_instances from airflow.utils.context import Context from airflow.utils.db import provide_session from sqlalchemy.orm.session import Session +if TYPE_CHECKING: + from airflow.models.dagrun import DagRun + @provide_session -def retry_tasks_callback(context: Context, session: Optional[Session] = None) -> None: - """Callback to clear tasks specified by the `retry_tasks` task param. - +def retry_tasks_callback(context: Context, session: Session | None = None) -> None: + """ + Clear tasks specified by the `retry_tasks` task param. + Intended to be used to as an `on_retry_callback` to also retry other tasks when a task fails. """ - retry_task_ids: list[str] = context['params'].get('retry_tasks', []) + retry_task_ids: list[str] = context["params"].get("retry_tasks", []) if isinstance(retry_task_ids, str): retry_task_ids = [retry_task_ids] - dag_run: DagRun = context['dag_run'] + dag_run: DagRun = context["dag_run"] retry_task_instances = [ task_instance for task_instance in dag_run.get_task_instances(session=session) diff --git a/utils/constants.py b/utils/constants.py new file mode 100644 index 000000000..693bf2b5c --- /dev/null +++ b/utils/constants.py @@ -0,0 +1,11 @@ +DS_WEEKLY = ( + "{% if dag_run.external_trigger %}" + "{{ ds_nodash }}" + "{% else %}" + '{{ macros.ds_format(macros.ds_add(ds, 6), "%Y-%m-%d", "%Y%m%d") }}' + "{% endif %}" +) + +FAILED_STATES = ["failed", "upstream_failed", "skipped"] + +ALLOWED_STATES = ["success"] diff --git a/dags/utils/dataproc.py b/utils/dataproc.py similarity index 54% rename from dags/utils/dataproc.py rename to utils/dataproc.py index 981350f8f..94373a63b 100644 --- a/dags/utils/dataproc.py +++ b/utils/dataproc.py @@ -1,4 +1,3 @@ -import json import os from collections import namedtuple @@ -19,43 +18,44 @@ DataprocSubmitSparkJobOperator, ) -class DataProcHelper: - """ - This is a helper class for creating/deleting dataproc clusters. - """ - - def __init__(self, - cluster_name=None, - job_name=None, - num_workers=2, - image_version='1.4-debian10', - region='us-west1', - subnetwork_uri=None, - internal_ip_only=None, - idle_delete_ttl=14400, - auto_delete_ttl=28800, - master_machine_type='n1-standard-8', - worker_machine_type='n1-standard-4', - num_preemptible_workers=0, - service_account='dataproc-runner-prod@airflow-dataproc.iam.gserviceaccount.com', - init_actions_uris=None, - additional_metadata=None, - additional_properties=None, - optional_components=['ANACONDA'], - install_component_gateway=True, - aws_conn_id=None, - gcp_conn_id='google_cloud_airflow_dataproc', - project_id='airflow-dataproc', - artifact_bucket='moz-fx-data-prod-airflow-dataproc-artifacts', - storage_bucket='moz-fx-data-prod-dataproc-scratch', - master_disk_type='pd-standard', - master_disk_size=1024, - master_num_local_ssds=0, - worker_disk_type='pd-standard', - worker_disk_size=1024, - worker_num_local_ssds=0, - ): +class DataProcHelper: + """Helper class for creating/deleting dataproc clusters.""" + + def __init__( + self, + cluster_name=None, + job_name=None, + num_workers=2, + image_version="1.4-debian10", + region="us-west1", + subnetwork_uri=None, + internal_ip_only=None, + idle_delete_ttl=14400, + auto_delete_ttl=28800, + master_machine_type="n1-standard-8", + worker_machine_type="n1-standard-4", + num_preemptible_workers=0, + service_account="dataproc-runner-prod@airflow-dataproc.iam.gserviceaccount.com", + init_actions_uris=None, + additional_metadata=None, + additional_properties=None, + optional_components=None, + install_component_gateway=True, + aws_conn_id=None, + gcp_conn_id="google_cloud_airflow_dataproc", + project_id="airflow-dataproc", + artifact_bucket="moz-fx-data-prod-airflow-dataproc-artifacts", + storage_bucket="moz-fx-data-prod-dataproc-scratch", + master_disk_type="pd-standard", + master_disk_size=1024, + master_num_local_ssds=0, + worker_disk_type="pd-standard", + worker_disk_size=1024, + worker_num_local_ssds=0, + ): + if optional_components is None: + optional_components = ["ANACONDA"] self.cluster_name = cluster_name self.job_name = job_name self.num_workers = num_workers @@ -82,9 +82,11 @@ def __init__(self, self.worker_num_local_ssds = worker_num_local_ssds if init_actions_uris is None: - self.init_actions_uris=['gs://{}/bootstrap/dataproc_init.sh'.format(self.artifact_bucket)] + self.init_actions_uris = [ + f"gs://{self.artifact_bucket}/bootstrap/dataproc_init.sh" + ] else: - self.init_actions_uris=init_actions_uris + self.init_actions_uris = init_actions_uris if additional_metadata is None: self.additional_metadata = {} @@ -103,9 +105,7 @@ def __init__(self, self.project_id = project_id def create_cluster(self): - """ - Returns a DataprocCreateClusterOperator - """ + """Return a DataprocCreateClusterOperator.""" properties = {} # Google cloud storage requires object.create permission when reading from pyspark @@ -115,7 +115,9 @@ def create_cluster(self): if self.aws_conn_id: for key, value in zip( ("access.key", "secret.key", "session.token"), - AwsBaseHook(aws_conn_id=self.aws_conn_id, client_type='s3').get_credentials(), + AwsBaseHook( + aws_conn_id=self.aws_conn_id, client_type="s3" + ).get_credentials(), ): if value is not None: properties["core:fs.s3a." + key] = value @@ -128,22 +130,22 @@ def create_cluster(self): properties.update(self.additional_properties) metadata = { - 'gcs-connector-version': '1.9.16', - 'bigquery-connector-version': '0.13.6' - } + "gcs-connector-version": "1.9.16", + "bigquery-connector-version": "0.13.6", + } metadata.update(self.additional_metadata) cluster_generator = ClusterGenerator( - project_id = self.project_id, - num_workers = self.num_workers, - subnetwork_uri = self.subnetwork_uri, - internal_ip_only = self.internal_ip_only, + project_id=self.project_id, + num_workers=self.num_workers, + subnetwork_uri=self.subnetwork_uri, + internal_ip_only=self.internal_ip_only, storage_bucket=self.storage_bucket, init_actions_uris=self.init_actions_uris, - metadata = metadata, + metadata=metadata, image_version=self.image_version, - properties = properties, - optional_components = self.optional_components, + properties=properties, + optional_components=self.optional_components, master_machine_type=self.master_machine_type, master_disk_type=self.master_disk_type, master_disk_size=self.master_disk_size, @@ -153,7 +155,7 @@ def create_cluster(self): num_preemptible_workers=self.num_preemptible_workers, service_account=self.service_account, idle_delete_ttl=self.idle_delete_ttl, - auto_delete_ttl=self.auto_delete_ttl + auto_delete_ttl=self.auto_delete_ttl, ) cluster_config = cluster_generator.make() @@ -162,83 +164,93 @@ def create_cluster(self): # ClusterConfig format is # https://cloud.google.com/dataproc/docs/reference/rpc/google.cloud.dataproc.v1#google.cloud.dataproc.v1.ClusterConfig if self.install_component_gateway: - cluster_config.update({'endpoint_config' : {'enable_http_port_access' : True}}) + cluster_config.update( + {"endpoint_config": {"enable_http_port_access": True}} + ) if self.master_num_local_ssds > 0: - master_instance_group_config = cluster_config['master_config'] - master_instance_group_config['disk_config']['num_local_ssds'] = self.master_num_local_ssds - cluster_config.update({'master_config' : master_instance_group_config}) + master_instance_group_config = cluster_config["master_config"] + master_instance_group_config["disk_config"][ + "num_local_ssds" + ] = self.master_num_local_ssds + cluster_config.update({"master_config": master_instance_group_config}) if self.worker_num_local_ssds > 0: - worker_instance_group_config = cluster_config['worker_config'] - worker_instance_group_config['disk_config']['num_local_ssds'] =self.worker_num_local_ssds - cluster_config.update({'worker_config' : worker_instance_group_config}) + worker_instance_group_config = cluster_config["worker_config"] + worker_instance_group_config["disk_config"][ + "num_local_ssds" + ] = self.worker_num_local_ssds + cluster_config.update({"worker_config": worker_instance_group_config}) return DataprocCreateClusterOperator( - task_id='create_dataproc_cluster', + task_id="create_dataproc_cluster", cluster_name=self.cluster_name, - project_id = self.project_id, + project_id=self.project_id, use_if_exists=True, delete_on_error=True, - labels={ 'env': os.getenv('DEPLOY_ENVIRONMENT', 'env_not_set'), - 'owner': os.getenv('AIRFLOW_CTX_DAG_OWNER', 'owner_not_set'), - 'jobname': self.job_name.lower().replace('_', '-') }, + labels={ + "env": os.getenv("DEPLOY_ENVIRONMENT", "env_not_set"), + "owner": os.getenv("AIRFLOW_CTX_DAG_OWNER", "owner_not_set"), + "jobname": self.job_name.lower().replace("_", "-"), + }, gcp_conn_id=self.gcp_conn_id, region=self.region, - cluster_config = cluster_config + cluster_config=cluster_config, ) def delete_cluster(self): - """ - Returns a DataprocDeleteClusterOperator - """ + """Return a DataprocDeleteClusterOperator.""" return DataprocDeleteClusterOperator( - task_id='delete_dataproc_cluster', + task_id="delete_dataproc_cluster", cluster_name=self.cluster_name, region=self.region, gcp_conn_id=self.gcp_conn_id, - project_id=self.project_id) -# End DataProcHelper + project_id=self.project_id, + ) -def moz_dataproc_pyspark_runner(parent_dag_name=None, - dag_name='run_pyspark_on_dataproc', - default_args=None, - cluster_name=None, - num_workers=2, - image_version='1.4-debian10', - region='us-west1', - subnetwork_uri=None, - internal_ip_only=None, - idle_delete_ttl=10800, - auto_delete_ttl=21600, - master_machine_type='n1-standard-8', - worker_machine_type='n1-standard-4', - num_preemptible_workers=0, - service_account='dataproc-runner-prod@airflow-dataproc.iam.gserviceaccount.com', - init_actions_uris=None, - additional_metadata=None, - additional_properties=None, - optional_components=['ANACONDA'], - install_component_gateway=True, - python_driver_code=None, - py_args=None, - job_name=None, - aws_conn_id=None, - gcp_conn_id='google_cloud_airflow_dataproc', - project_id='airflow-dataproc', - artifact_bucket='moz-fx-data-prod-airflow-dataproc-artifacts', - storage_bucket='moz-fx-data-prod-dataproc-scratch', - master_disk_type='pd-standard', - worker_disk_type='pd-standard', - master_disk_size=1024, - worker_disk_size=1024, - master_num_local_ssds=0, - worker_num_local_ssds=0, - ): +# End DataProcHelper + +def moz_dataproc_pyspark_runner( + parent_dag_name=None, + dag_name="run_pyspark_on_dataproc", + default_args=None, + cluster_name=None, + num_workers=2, + image_version="1.4-debian10", + region="us-west1", + subnetwork_uri=None, + internal_ip_only=None, + idle_delete_ttl=10800, + auto_delete_ttl=21600, + master_machine_type="n1-standard-8", + worker_machine_type="n1-standard-4", + num_preemptible_workers=0, + service_account="dataproc-runner-prod@airflow-dataproc.iam.gserviceaccount.com", + init_actions_uris=None, + additional_metadata=None, + additional_properties=None, + optional_components=None, + install_component_gateway=True, + python_driver_code=None, + py_args=None, + job_name=None, + aws_conn_id=None, + gcp_conn_id="google_cloud_airflow_dataproc", + project_id="airflow-dataproc", + artifact_bucket="moz-fx-data-prod-airflow-dataproc-artifacts", + storage_bucket="moz-fx-data-prod-dataproc-scratch", + master_disk_type="pd-standard", + worker_disk_type="pd-standard", + master_disk_size=1024, + worker_disk_size=1024, + master_num_local_ssds=0, + worker_num_local_ssds=0, +): """ - This will initially create a GCP Dataproc cluster with Anaconda/Jupyter/Component gateway. + Create a GCP Dataproc cluster with Anaconda/Jupyter/Component gateway. + Then we call DataprocSubmitPySparkJobOperator to execute the pyspark script defined by the argument python_driver_code. Once that succeeds, we teardown the cluster. @@ -342,98 +354,104 @@ def moz_dataproc_pyspark_runner(parent_dag_name=None, """ + if optional_components is None: + optional_components = ["ANACONDA"] if cluster_name is None or python_driver_code is None: - raise AirflowException('Please specify cluster_name and/or python_driver_code.') - - dataproc_helper = DataProcHelper(cluster_name=cluster_name, - job_name=job_name, - num_workers=num_workers, - image_version=image_version, - region=region, - subnetwork_uri=subnetwork_uri, - internal_ip_only=internal_ip_only, - idle_delete_ttl=idle_delete_ttl, - auto_delete_ttl=auto_delete_ttl, - master_machine_type=master_machine_type, - worker_machine_type=worker_machine_type, - num_preemptible_workers=num_preemptible_workers, - service_account=service_account, - init_actions_uris=init_actions_uris, - optional_components=optional_components, - additional_metadata=additional_metadata, - additional_properties=additional_properties, - install_component_gateway=install_component_gateway, - aws_conn_id=aws_conn_id, - gcp_conn_id=gcp_conn_id, - project_id=project_id, - artifact_bucket=artifact_bucket, - storage_bucket=storage_bucket, - master_disk_type=master_disk_type, - master_disk_size=master_disk_size, - worker_disk_type=worker_disk_type, - worker_disk_size=worker_disk_size, - master_num_local_ssds=master_num_local_ssds, - worker_num_local_ssds=worker_num_local_ssds, - ) - - _dag_name = '{}.{}'.format(parent_dag_name, dag_name) + raise AirflowException("Please specify cluster_name and/or python_driver_code.") + + dataproc_helper = DataProcHelper( + cluster_name=cluster_name, + job_name=job_name, + num_workers=num_workers, + image_version=image_version, + region=region, + subnetwork_uri=subnetwork_uri, + internal_ip_only=internal_ip_only, + idle_delete_ttl=idle_delete_ttl, + auto_delete_ttl=auto_delete_ttl, + master_machine_type=master_machine_type, + worker_machine_type=worker_machine_type, + num_preemptible_workers=num_preemptible_workers, + service_account=service_account, + init_actions_uris=init_actions_uris, + optional_components=optional_components, + additional_metadata=additional_metadata, + additional_properties=additional_properties, + install_component_gateway=install_component_gateway, + aws_conn_id=aws_conn_id, + gcp_conn_id=gcp_conn_id, + project_id=project_id, + artifact_bucket=artifact_bucket, + storage_bucket=storage_bucket, + master_disk_type=master_disk_type, + master_disk_size=master_disk_size, + worker_disk_type=worker_disk_type, + worker_disk_size=worker_disk_size, + master_num_local_ssds=master_num_local_ssds, + worker_num_local_ssds=worker_num_local_ssds, + ) + + _dag_name = f"{parent_dag_name}.{dag_name}" with models.DAG(_dag_name, default_args=default_args) as dag: create_dataproc_cluster = dataproc_helper.create_cluster() run_pyspark_on_dataproc = DataprocSubmitPySparkJobOperator( - task_id='run_dataproc_pyspark', + task_id="run_dataproc_pyspark", job_name=job_name, cluster_name=cluster_name, region=region, main=python_driver_code, arguments=py_args, gcp_conn_id=gcp_conn_id, - project_id=project_id + project_id=project_id, ) delete_dataproc_cluster = dataproc_helper.delete_cluster() create_dataproc_cluster >> run_pyspark_on_dataproc >> delete_dataproc_cluster return dag -# End moz_dataproc_pyspark_runner -def moz_dataproc_jar_runner(parent_dag_name=None, - dag_name='run_script_on_dataproc', - default_args=None, - cluster_name=None, - num_workers=2, - image_version='1.4-debian10', - region='us-west1', - subnetwork_uri=None, - internal_ip_only=None, - idle_delete_ttl=14400, - auto_delete_ttl=28800, - master_machine_type='n1-standard-8', - worker_machine_type='n1-standard-4', - num_preemptible_workers=0, - service_account='dataproc-runner-prod@airflow-dataproc.iam.gserviceaccount.com', - init_actions_uris=None, - optional_components=['ANACONDA'], - install_component_gateway=True, - jar_urls=None, - main_class=None, - jar_args=None, - job_name=None, - aws_conn_id=None, - gcp_conn_id='google_cloud_airflow_dataproc', - project_id='airflow-dataproc', - master_disk_type='pd-standard', - worker_disk_type='pd-standard', - master_disk_size=1024, - worker_disk_size=1024, - master_num_local_ssds=0, - worker_num_local_ssds=0, - ): +# End moz_dataproc_pyspark_runner + +def moz_dataproc_jar_runner( + parent_dag_name=None, + dag_name="run_script_on_dataproc", + default_args=None, + cluster_name=None, + num_workers=2, + image_version="1.4-debian10", + region="us-west1", + subnetwork_uri=None, + internal_ip_only=None, + idle_delete_ttl=14400, + auto_delete_ttl=28800, + master_machine_type="n1-standard-8", + worker_machine_type="n1-standard-4", + num_preemptible_workers=0, + service_account="dataproc-runner-prod@airflow-dataproc.iam.gserviceaccount.com", + init_actions_uris=None, + optional_components=None, + install_component_gateway=True, + jar_urls=None, + main_class=None, + jar_args=None, + job_name=None, + aws_conn_id=None, + gcp_conn_id="google_cloud_airflow_dataproc", + project_id="airflow-dataproc", + master_disk_type="pd-standard", + worker_disk_type="pd-standard", + master_disk_size=1024, + worker_disk_size=1024, + master_num_local_ssds=0, + worker_num_local_ssds=0, +): """ - This will initially create a GCP Dataproc cluster with Anaconda/Jupyter/Component gateway. + Create a GCP Dataproc cluster with Anaconda/Jupyter/Component gateway. + Then we call DataprocSubmitSparkJobOperator to execute the jar defined by the arguments jar_urls and main_class. Once that succeeds, we teardown the cluster. @@ -477,37 +495,42 @@ def moz_dataproc_jar_runner(parent_dag_name=None, """ + if optional_components is None: + optional_components = ["ANACONDA"] if cluster_name is None or jar_urls is None or main_class is None: - raise AirflowException('Please specify cluster_name, jar_urls, and/or main_class.') - - dataproc_helper = DataProcHelper(cluster_name=cluster_name, - job_name=job_name, - num_workers=num_workers, - image_version=image_version, - region=region, - subnetwork_uri=subnetwork_uri, - internal_ip_only=internal_ip_only, - idle_delete_ttl=idle_delete_ttl, - auto_delete_ttl=auto_delete_ttl, - master_machine_type=master_machine_type, - worker_machine_type=worker_machine_type, - num_preemptible_workers=num_preemptible_workers, - service_account=service_account, - init_actions_uris=init_actions_uris, - optional_components=optional_components, - install_component_gateway=install_component_gateway, - aws_conn_id=aws_conn_id, - gcp_conn_id=gcp_conn_id, - project_id=project_id, - master_disk_type=master_disk_type, - master_disk_size=master_disk_size, - worker_disk_type=worker_disk_type, - worker_disk_size=worker_disk_size, - master_num_local_ssds=master_num_local_ssds, - worker_num_local_ssds=worker_num_local_ssds, - ) - - _dag_name = '{}.{}'.format(parent_dag_name, dag_name) + raise AirflowException( + "Please specify cluster_name, jar_urls, and/or main_class." + ) + + dataproc_helper = DataProcHelper( + cluster_name=cluster_name, + job_name=job_name, + num_workers=num_workers, + image_version=image_version, + region=region, + subnetwork_uri=subnetwork_uri, + internal_ip_only=internal_ip_only, + idle_delete_ttl=idle_delete_ttl, + auto_delete_ttl=auto_delete_ttl, + master_machine_type=master_machine_type, + worker_machine_type=worker_machine_type, + num_preemptible_workers=num_preemptible_workers, + service_account=service_account, + init_actions_uris=init_actions_uris, + optional_components=optional_components, + install_component_gateway=install_component_gateway, + aws_conn_id=aws_conn_id, + gcp_conn_id=gcp_conn_id, + project_id=project_id, + master_disk_type=master_disk_type, + master_disk_size=master_disk_size, + worker_disk_type=worker_disk_type, + worker_disk_size=worker_disk_size, + master_num_local_ssds=master_num_local_ssds, + worker_num_local_ssds=worker_num_local_ssds, + ) + + _dag_name = f"{parent_dag_name}.{dag_name}" with models.DAG(_dag_name, default_args=default_args) as dag: create_dataproc_cluster = dataproc_helper.create_cluster() @@ -515,62 +538,65 @@ def moz_dataproc_jar_runner(parent_dag_name=None, run_jar_on_dataproc = DataprocSubmitSparkJobOperator( cluster_name=cluster_name, region=region, - task_id='run_jar_on_dataproc', + task_id="run_jar_on_dataproc", job_name=job_name, dataproc_jars=jar_urls, main_class=main_class, arguments=jar_args, gcp_conn_id=gcp_conn_id, - project_id=project_id + project_id=project_id, ) delete_dataproc_cluster = dataproc_helper.delete_cluster() create_dataproc_cluster >> run_jar_on_dataproc >> delete_dataproc_cluster return dag + + # End moz_dataproc_jar_runner def _format_envvar(env=None): # Use a default value if an environment dictionary isn't supplied - return ' '.join(['{}={}'.format(k, v) for k, v in (env or {}).items()]) - - -def moz_dataproc_scriptrunner(parent_dag_name=None, - dag_name='run_script_on_dataproc', - default_args=None, - cluster_name=None, - num_workers=2, - image_version='1.4-debian10', - region='us-west1', - subnetwork_uri=None, - internal_ip_only=None, - idle_delete_ttl=14400, - auto_delete_ttl=28800, - master_machine_type='n1-standard-8', - worker_machine_type='n1-standard-4', - num_preemptible_workers=0, - service_account='dataproc-runner-prod@airflow-dataproc.iam.gserviceaccount.com', - init_actions_uris=None, - optional_components=['ANACONDA'], - install_component_gateway=True, - uri=None, - env=None, - arguments=None, - job_name=None, - aws_conn_id=None, - gcp_conn_id='google_cloud_airflow_dataproc', - project_id='airflow-dataproc', - master_disk_type='pd-standard', - worker_disk_type='pd-standard', - master_disk_size=1024, - worker_disk_size=1024, - master_num_local_ssds=0, - worker_num_local_ssds=0, - ): - + return " ".join([f"{k}={v}" for k, v in (env or {}).items()]) + + +def moz_dataproc_scriptrunner( + parent_dag_name=None, + dag_name="run_script_on_dataproc", + default_args=None, + cluster_name=None, + num_workers=2, + image_version="1.4-debian10", + region="us-west1", + subnetwork_uri=None, + internal_ip_only=None, + idle_delete_ttl=14400, + auto_delete_ttl=28800, + master_machine_type="n1-standard-8", + worker_machine_type="n1-standard-4", + num_preemptible_workers=0, + service_account="dataproc-runner-prod@airflow-dataproc.iam.gserviceaccount.com", + init_actions_uris=None, + optional_components=None, + install_component_gateway=True, + uri=None, + env=None, + arguments=None, + job_name=None, + aws_conn_id=None, + gcp_conn_id="google_cloud_airflow_dataproc", + project_id="airflow-dataproc", + master_disk_type="pd-standard", + worker_disk_type="pd-standard", + master_disk_size=1024, + worker_disk_size=1024, + master_num_local_ssds=0, + worker_num_local_ssds=0, +): """ - This will initially create a GCP Dataproc cluster with Anaconda/Jupyter/Component gateway. + Create a GCP Dataproc cluster with Anaconda/Jupyter/Component gateway. + Then we execute a script uri (either https or gcs) similar to how we use our custom AWS EmrSparkOperator. This will call DataprocSubmitSparkJobOperator using EMR's script-runner.jar, which then executes the airflow_gcp.sh entrypoint script. The entrypoint script expects another @@ -622,51 +648,57 @@ def moz_dataproc_scriptrunner(parent_dag_name=None, """ + if optional_components is None: + optional_components = ["ANACONDA"] if job_name is None or uri is None or cluster_name is None: - raise AirflowException('Please specify job_name, uri, and cluster_name.') - - dataproc_helper = DataProcHelper(cluster_name=cluster_name, - job_name=job_name, - num_workers=num_workers, - image_version=image_version, - region=region, - subnetwork_uri=subnetwork_uri, - internal_ip_only=internal_ip_only, - idle_delete_ttl=idle_delete_ttl, - auto_delete_ttl=auto_delete_ttl, - master_machine_type=master_machine_type, - worker_machine_type=worker_machine_type, - num_preemptible_workers=num_preemptible_workers, - service_account=service_account, - init_actions_uris=init_actions_uris, - optional_components=optional_components, - install_component_gateway=install_component_gateway, - aws_conn_id=aws_conn_id, - gcp_conn_id=gcp_conn_id, - project_id=project_id, - master_disk_type=master_disk_type, - master_disk_size=master_disk_size, - worker_disk_type=worker_disk_type, - worker_disk_size=worker_disk_size, - master_num_local_ssds=master_num_local_ssds, - worker_num_local_ssds=worker_num_local_ssds, - ) - - _dag_name = '{}.{}'.format(parent_dag_name, dag_name) + raise AirflowException("Please specify job_name, uri, and cluster_name.") + + dataproc_helper = DataProcHelper( + cluster_name=cluster_name, + job_name=job_name, + num_workers=num_workers, + image_version=image_version, + region=region, + subnetwork_uri=subnetwork_uri, + internal_ip_only=internal_ip_only, + idle_delete_ttl=idle_delete_ttl, + auto_delete_ttl=auto_delete_ttl, + master_machine_type=master_machine_type, + worker_machine_type=worker_machine_type, + num_preemptible_workers=num_preemptible_workers, + service_account=service_account, + init_actions_uris=init_actions_uris, + optional_components=optional_components, + install_component_gateway=install_component_gateway, + aws_conn_id=aws_conn_id, + gcp_conn_id=gcp_conn_id, + project_id=project_id, + master_disk_type=master_disk_type, + master_disk_size=master_disk_size, + worker_disk_type=worker_disk_type, + worker_disk_size=worker_disk_size, + master_num_local_ssds=master_num_local_ssds, + worker_num_local_ssds=worker_num_local_ssds, + ) + + _dag_name = f"{parent_dag_name}.{dag_name}" environment = _format_envvar(env) - script_bucket = 'moz-fx-data-prod-airflow-dataproc-artifacts' - jar_url = 'gs://{}/bin/script-runner.jar'.format(script_bucket) + script_bucket = "moz-fx-data-prod-airflow-dataproc-artifacts" + jar_url = f"gs://{script_bucket}/bin/script-runner.jar" args = [ - 'gs://{}/bootstrap/airflow_gcp.sh'.format(script_bucket), - '--job-name', job_name, - '--uri', uri, - '--environment', environment + f"gs://{script_bucket}/bootstrap/airflow_gcp.sh", + "--job-name", + job_name, + "--uri", + uri, + "--environment", + environment, ] if arguments: - args += ['--arguments', arguments] + args += ["--arguments", arguments] with models.DAG(_dag_name, default_args=default_args) as dag: create_dataproc_cluster = dataproc_helper.create_cluster() @@ -676,24 +708,27 @@ def moz_dataproc_scriptrunner(parent_dag_name=None, run_script_on_dataproc = DataprocSubmitSparkJobOperator( cluster_name=cluster_name, region=region, - task_id='run_script_on_dataproc', + task_id="run_script_on_dataproc", job_name=job_name, dataproc_jars=[jar_url], - main_class='com.amazon.elasticmapreduce.scriptrunner.ScriptRunner', + main_class="com.amazon.elasticmapreduce.scriptrunner.ScriptRunner", arguments=args, gcp_conn_id=gcp_conn_id, - project_id=project_id + project_id=project_id, ) delete_dataproc_cluster = dataproc_helper.delete_cluster() create_dataproc_cluster >> run_script_on_dataproc >> delete_dataproc_cluster return dag + + # End moz_dataproc_scriptrunner def copy_artifacts_dev(dag, project_id, artifact_bucket, storage_bucket): - """Bootstrap a dataproc job for local testing. + """ + Bootstrap a dataproc job for local testing. This job requires setting GOOGLE_APPLICATION_CREDENTIALS before starting the airflow container. It will copy the contents of the local jobs and @@ -747,14 +782,16 @@ def copy_artifacts_dev(dag, project_id, artifact_bucket, storage_bucket): def get_dataproc_parameters(conn_id="google_cloud_airflow_dataproc"): - """This function can be used to gather parameters that correspond to development - parameters. The provided connection string should be a Google Cloud connection + """ + Can be used to gather parameters that correspond to development parameters. + + The provided connection string should be a Google Cloud connection and should either be the production default ("dataproc-runner-prod"), or a service key associated with a sandbox account. """ dev_project_id = "replace_me" dev_client_email = "replace_me" - + is_dev = os.environ.get("DEPLOY_ENVIRONMENT") == "dev" project_id = "airflow-dataproc" if is_dev else dev_project_id client_email = ( @@ -763,18 +800,16 @@ def get_dataproc_parameters(conn_id="google_cloud_airflow_dataproc"): else "dataproc-runner-prod@airflow-dataproc.iam.gserviceaccount.com" ) artifact_bucket = ( - "{}-dataproc-artifacts".format(project_id) + f"{project_id}-dataproc-artifacts" if is_dev else "moz-fx-data-prod-airflow-dataproc-artifacts" ) storage_bucket = ( - "{}-dataproc-scratch".format(project_id) + f"{project_id}-dataproc-scratch" if is_dev else "moz-fx-data-prod-dataproc-scratch" ) - output_bucket = ( - artifact_bucket if is_dev else "airflow-dataproc-bq-parquet-exports" - ) + output_bucket = artifact_bucket if is_dev else "airflow-dataproc-bq-parquet-exports" return DataprocParameters( conn_id, project_id, @@ -782,5 +817,5 @@ def get_dataproc_parameters(conn_id="google_cloud_airflow_dataproc"): client_email, artifact_bucket, storage_bucket, - output_bucket + output_bucket, ) diff --git a/dags/utils/gcp.py b/utils/gcp.py similarity index 99% rename from dags/utils/gcp.py rename to utils/gcp.py index 28a5af50e..ae6192365 100644 --- a/dags/utils/gcp.py +++ b/utils/gcp.py @@ -14,8 +14,8 @@ BigQueryToGCSOperator, ) -from dags.operators.gcp_container_operator import GKEPodOperator -from dags.utils.dataproc import get_dataproc_parameters +from operators.gcp_container_operator import GKEPodOperator +from utils.dataproc import get_dataproc_parameters GCP_PROJECT_ID = "moz-fx-data-airflow-gke-prod" DATAPROC_PROJECT_ID = "airflow-dataproc" diff --git a/dags/utils/gke.py b/utils/gke.py similarity index 97% rename from dags/utils/gke.py rename to utils/gke.py index 4ee451ffa..94344ad57 100644 --- a/dags/utils/gke.py +++ b/utils/gke.py @@ -11,10 +11,8 @@ def create_gke_config( subnetwork="default", is_dev=False, ): - """ - Helper function to create gke cluster definition dict. All fields must match - their protobuf definitions. + Create gke cluster definition dict. All fields must match their protobuf definitions. See: https://cloud.google.com/kubernetes-engine/docs/reference/rest/v1beta1/projects.locations.clusters#Cluster diff --git a/dags/utils/acoustic/__init__.py b/utils/glam_subdags/__init__.py similarity index 100% rename from dags/utils/acoustic/__init__.py rename to utils/glam_subdags/__init__.py diff --git a/dags/glam_subdags/extract.py b/utils/glam_subdags/extract.py similarity index 99% rename from dags/glam_subdags/extract.py rename to utils/glam_subdags/extract.py index fbe3145b6..c7e9b0838 100644 --- a/dags/glam_subdags/extract.py +++ b/utils/glam_subdags/extract.py @@ -4,6 +4,7 @@ from airflow.providers.google.cloud.transfers.bigquery_to_gcs import ( BigQueryToGCSOperator, ) + from utils.gcp import bigquery_etl_query gcp_conn_id = "google_cloud_airflow_dataproc" diff --git a/dags/glam_subdags/general.py b/utils/glam_subdags/general.py similarity index 99% rename from dags/glam_subdags/general.py rename to utils/glam_subdags/general.py index 743b2e62e..eaab4d43a 100644 --- a/dags/glam_subdags/general.py +++ b/utils/glam_subdags/general.py @@ -1,4 +1,5 @@ from airflow.models import DAG + from utils.gcp import bigquery_etl_query diff --git a/dags/glam_subdags/generate_query.py b/utils/glam_subdags/generate_query.py similarity index 100% rename from dags/glam_subdags/generate_query.py rename to utils/glam_subdags/generate_query.py diff --git a/dags/glam_subdags/histograms.py b/utils/glam_subdags/histograms.py similarity index 99% rename from dags/glam_subdags/histograms.py rename to utils/glam_subdags/histograms.py index 40aa4009d..d84cc7882 100644 --- a/dags/glam_subdags/histograms.py +++ b/utils/glam_subdags/histograms.py @@ -1,4 +1,5 @@ from airflow.models import DAG + from utils.gcp import bigquery_etl_query GLAM_HISTOGRAM_AGGREGATES_FINAL_SUBDAG = "clients_histogram_aggregates" diff --git a/dags/glam_subdags/probe_hotlist.py b/utils/glam_subdags/probe_hotlist.py similarity index 94% rename from dags/glam_subdags/probe_hotlist.py rename to utils/glam_subdags/probe_hotlist.py index e7896bd38..f083fb99e 100644 --- a/dags/glam_subdags/probe_hotlist.py +++ b/utils/glam_subdags/probe_hotlist.py @@ -11,6 +11,8 @@ def update_hotlist( **kwargs, ): """ + Update hotlist. + :param task_id: Airflow task id :param project_id: GCP project to write to :param source_dataset_id: Bigquery dataset to read from in queries @@ -26,9 +28,7 @@ def update_hotlist( "DATASET": destination_dataset_id, "SUBMISSION_DATE": "{{ ds }}", } - command = [ - "script/glam/update_probe_hotlist" - ] + command = ["script/glam/update_probe_hotlist"] return gke_command( task_id=task_id, cmds=["bash"], diff --git a/dags/utils/patched/__init__.py b/utils/patched/__init__.py similarity index 100% rename from dags/utils/patched/__init__.py rename to utils/patched/__init__.py diff --git a/dags/utils/patched/dataproc_hook.py b/utils/patched/dataproc_hook.py similarity index 79% rename from dags/utils/patched/dataproc_hook.py rename to utils/patched/dataproc_hook.py index e72caf292..70540ad69 100644 --- a/dags/utils/patched/dataproc_hook.py +++ b/utils/patched/dataproc_hook.py @@ -16,9 +16,9 @@ # specific language governing permissions and limitations # under the License. # -"""This module contains a Google Cloud Dataproc hook.""" - """ +Contains a Google Cloud Dataproc hook. + I have copy pasted this from the providers-google/5.0.0 branch https://github.com/apache/airflow/blob/providers-google/5.0.0/airflow/providers/google/cloud/hooks/dataproc.py @@ -34,10 +34,14 @@ import time import uuid import warnings -from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union +from collections.abc import Iterable, Sequence +from airflow.exceptions import AirflowException +from airflow.providers.google.common.hooks.base_google import GoogleBaseHook +from airflow.version import version as airflow_version from google.api_core.exceptions import ServerError from google.api_core.retry import Retry + # Moz specific - import has changed from google.cloud.dataproc_v1 import ( Cluster, @@ -51,10 +55,6 @@ from google.protobuf.duration_pb2 import Duration from google.protobuf.field_mask_pb2 import FieldMask -from airflow.exceptions import AirflowException -from airflow.providers.google.common.hooks.base_google import GoogleBaseHook -from airflow.version import version as airflow_version - class DataProcJobBuilder: """A helper class for building Dataproc job.""" @@ -65,7 +65,7 @@ def __init__( task_id: str, cluster_name: str, job_type: str, - properties: Optional[Dict[str, str]] = None, + properties: dict[str, str] | None = None, ) -> None: name = task_id + "_" + str(uuid.uuid4())[:8] self.job_type = job_type @@ -73,7 +73,10 @@ def __init__( "job": { "reference": {"project_id": project_id, "job_id": name}, "placement": {"cluster_name": cluster_name}, - "labels": {'airflow-version': 'v' + airflow_version.replace('.', '-').replace('+', '-')}, + "labels": { + "airflow-version": "v" + + airflow_version.replace(".", "-").replace("+", "-") + }, job_type: {}, } } # type: Dict[str, Any] @@ -90,7 +93,7 @@ def add_labels(self, labels: dict) -> None: if labels: self.job["job"]["labels"].update(labels) - def add_variables(self, variables: List[str]) -> None: + def add_variables(self, variables: list[str]) -> None: """ Set variables for Dataproc job. @@ -100,7 +103,7 @@ def add_variables(self, variables: List[str]) -> None: if variables is not None: self.job["job"][self.job_type]["script_variables"] = variables - def add_args(self, args: List[str]) -> None: + def add_args(self, args: list[str]) -> None: """ Set args for Dataproc job. @@ -110,14 +113,14 @@ def add_args(self, args: List[str]) -> None: if args is not None: self.job["job"][self.job_type]["args"] = args - def add_query(self, query: List[str]) -> None: + def add_query(self, query: list[str]) -> None: """ Set query uris for Dataproc job. :param query: URIs for the job queries. :type query: List[str] """ - self.job["job"][self.job_type]["query_list"] = {'queries': [query]} + self.job["job"][self.job_type]["query_list"] = {"queries": [query]} def add_query_uri(self, query_uri: str) -> None: """ @@ -128,7 +131,7 @@ def add_query_uri(self, query_uri: str) -> None: """ self.job["job"][self.job_type]["query_file_uri"] = query_uri - def add_jar_file_uris(self, jars: List[str]) -> None: + def add_jar_file_uris(self, jars: list[str]) -> None: """ Set jars uris for Dataproc job. @@ -138,7 +141,7 @@ def add_jar_file_uris(self, jars: List[str]) -> None: if jars is not None: self.job["job"][self.job_type]["jar_file_uris"] = jars - def add_archive_uris(self, archives: List[str]) -> None: + def add_archive_uris(self, archives: list[str]) -> None: """ Set archives uris for Dataproc job. @@ -148,7 +151,7 @@ def add_archive_uris(self, archives: List[str]) -> None: if archives is not None: self.job["job"][self.job_type]["archive_uris"] = archives - def add_file_uris(self, files: List[str]) -> None: + def add_file_uris(self, files: list[str]) -> None: """ Set file uris for Dataproc job. @@ -158,7 +161,7 @@ def add_file_uris(self, files: List[str]) -> None: if files is not None: self.job["job"][self.job_type]["file_uris"] = files - def add_python_file_uris(self, pyfiles: List[str]) -> None: + def add_python_file_uris(self, pyfiles: list[str]) -> None: """ Set python file uris for Dataproc job. @@ -168,7 +171,7 @@ def add_python_file_uris(self, pyfiles: List[str]) -> None: if pyfiles is not None: self.job["job"][self.job_type]["python_file_uris"] = pyfiles - def set_main(self, main_jar: Optional[str], main_class: Optional[str]) -> None: + def set_main(self, main_jar: str | None, main_class: str | None) -> None: """ Set Dataproc main class. @@ -203,9 +206,9 @@ def set_job_name(self, name: str) -> None: """ self.job["job"]["reference"]["job_id"] = name + "_" + str(uuid.uuid4())[:8] - def build(self) -> Dict: + def build(self) -> dict: """ - Returns Dataproc job. + Return Dataproc job. :return: Dataproc job :rtype: dict @@ -222,9 +225,9 @@ class DataprocHook(GoogleBaseHook): """ def get_cluster_client( - self, region: Optional[str] = None, location: Optional[str] = None + self, region: str | None = None, location: str | None = None ) -> ClusterControllerClient: - """Returns ClusterControllerClient.""" + """Return ClusterControllerClient.""" if location is not None: warnings.warn( "Parameter `location` will be deprecated. " @@ -234,17 +237,19 @@ def get_cluster_client( ) region = location client_options = None - if region and region != 'global': - client_options = {'api_endpoint': f'{region}-dataproc.googleapis.com:443'} + if region and region != "global": + client_options = {"api_endpoint": f"{region}-dataproc.googleapis.com:443"} return ClusterControllerClient( - credentials=self._get_credentials(), client_info=self.client_info, client_options=client_options + credentials=self._get_credentials(), + client_info=self.client_info, + client_options=client_options, ) def get_template_client( - self, region: Optional[str] = None, location: Optional[str] = None + self, region: str | None = None, location: str | None = None ) -> WorkflowTemplateServiceClient: - """Returns WorkflowTemplateServiceClient.""" + """Return WorkflowTemplateServiceClient.""" if location is not None: warnings.warn( "Parameter `location` will be deprecated. " @@ -254,17 +259,19 @@ def get_template_client( ) region = location client_options = None - if region and region != 'global': - client_options = {'api_endpoint': f'{region}-dataproc.googleapis.com:443'} + if region and region != "global": + client_options = {"api_endpoint": f"{region}-dataproc.googleapis.com:443"} return WorkflowTemplateServiceClient( - credentials=self._get_credentials(), client_info=self.client_info, client_options=client_options + credentials=self._get_credentials(), + client_info=self.client_info, + client_options=client_options, ) def get_job_client( - self, region: Optional[str] = None, location: Optional[str] = None + self, region: str | None = None, location: str | None = None ) -> JobControllerClient: - """Returns JobControllerClient.""" + """Return JobControllerClient.""" if location is not None: warnings.warn( "Parameter `location` will be deprecated. " @@ -274,11 +281,13 @@ def get_job_client( ) region = location client_options = None - if region and region != 'global': - client_options = {'api_endpoint': f'{region}-dataproc.googleapis.com:443'} + if region and region != "global": + client_options = {"api_endpoint": f"{region}-dataproc.googleapis.com:443"} return JobControllerClient( - credentials=self._get_credentials(), client_info=self.client_info, client_options=client_options + credentials=self._get_credentials(), + client_info=self.client_info, + client_options=client_options, ) @GoogleBaseHook.fallback_to_default_project_id @@ -287,15 +296,15 @@ def create_cluster( region: str, project_id: str, cluster_name: str, - cluster_config: Union[Dict, Cluster], - labels: Optional[Dict[str, str]] = None, - request_id: Optional[str] = None, - retry: Optional[Retry] = None, - timeout: Optional[float] = None, - metadata: Optional[Sequence[Tuple[str, str]]] = None, + cluster_config: dict | Cluster, + labels: dict[str, str] | None = None, + request_id: str | None = None, + retry: Retry | None = None, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] | None = None, ): """ - Creates a cluster in a project. + Create a cluster in a project. :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to. :type project_id: str @@ -326,7 +335,12 @@ def create_cluster( # [a-z]([-a-z0-9]*[a-z0-9])? (current airflow version string follows # semantic versioning spec: x.y.z). labels = labels or {} - labels.update({'airflow-version': 'v' + airflow_version.replace('.', '-').replace('+', '-')}) + labels.update( + { + "airflow-version": "v" + + airflow_version.replace(".", "-").replace("+", "-") + } + ) cluster = { "project_id": project_id, @@ -338,10 +352,10 @@ def create_cluster( client = self.get_cluster_client(region=region) result = client.create_cluster( request={ - 'project_id': project_id, - 'region': region, - 'cluster': cluster, - 'request_id': request_id, + "project_id": project_id, + "region": region, + "cluster": cluster, + "request_id": request_id, }, retry=retry, timeout=timeout, @@ -355,14 +369,14 @@ def delete_cluster( region: str, cluster_name: str, project_id: str, - cluster_uuid: Optional[str] = None, - request_id: Optional[str] = None, - retry: Optional[Retry] = None, - timeout: Optional[float] = None, - metadata: Optional[Sequence[Tuple[str, str]]] = None, + cluster_uuid: str | None = None, + request_id: str | None = None, + retry: Retry | None = None, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] | None = None, ): """ - Deletes a cluster in a project. + Delete a cluster in a project. :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to. :type project_id: str @@ -389,11 +403,11 @@ def delete_cluster( client = self.get_cluster_client(region=region) result = client.delete_cluster( request={ - 'project_id': project_id, - 'region': region, - 'cluster_name': cluster_name, - 'cluster_uuid': cluster_uuid, - 'request_id': request_id, + "project_id": project_id, + "region": region, + "cluster_name": cluster_name, + "cluster_uuid": cluster_uuid, + "request_id": request_id, }, retry=retry, timeout=timeout, @@ -407,13 +421,12 @@ def diagnose_cluster( region: str, cluster_name: str, project_id: str, - retry: Optional[Retry] = None, - timeout: Optional[float] = None, - metadata: Optional[Sequence[Tuple[str, str]]] = None, + retry: Retry | None = None, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] | None = None, ): """ - Gets cluster diagnostic information. After the operation completes GCS uri to - diagnose is returned + Get cluster diagnostic information. After the operation completes GCS uri to diagnose is returned. :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to. :type project_id: str @@ -432,7 +445,11 @@ def diagnose_cluster( """ client = self.get_cluster_client(region=region) operation = client.diagnose_cluster( - request={'project_id': project_id, 'region': region, 'cluster_name': cluster_name}, + request={ + "project_id": project_id, + "region": region, + "cluster_name": cluster_name, + }, retry=retry, timeout=timeout, metadata=metadata, @@ -447,12 +464,12 @@ def get_cluster( region: str, cluster_name: str, project_id: str, - retry: Optional[Retry] = None, - timeout: Optional[float] = None, - metadata: Optional[Sequence[Tuple[str, str]]] = None, + retry: Retry | None = None, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] | None = None, ): """ - Gets the resource representation for a cluster in a project. + Get the resource representation for a cluster in a project. :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to. :type project_id: str @@ -471,7 +488,11 @@ def get_cluster( """ client = self.get_cluster_client(region=region) result = client.get_cluster( - request={'project_id': project_id, 'region': region, 'cluster_name': cluster_name}, + request={ + "project_id": project_id, + "region": region, + "cluster_name": cluster_name, + }, retry=retry, timeout=timeout, metadata=metadata, @@ -484,13 +505,13 @@ def list_clusters( region: str, filter_: str, project_id: str, - page_size: Optional[int] = None, - retry: Optional[Retry] = None, - timeout: Optional[float] = None, - metadata: Optional[Sequence[Tuple[str, str]]] = None, + page_size: int | None = None, + retry: Retry | None = None, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] | None = None, ): """ - Lists all regions/{region}/clusters in a project. + List all regions/{region}/clusters in a project. :param project_id: Required. The ID of the Google Cloud project that the cluster belongs to. :type project_id: str @@ -513,7 +534,12 @@ def list_clusters( """ client = self.get_cluster_client(region=region) result = client.list_clusters( - request={'project_id': project_id, 'region': region, 'filter': filter_, 'page_size': page_size}, + request={ + "project_id": project_id, + "region": region, + "filter": filter_, + "page_size": page_size, + }, retry=retry, timeout=timeout, metadata=metadata, @@ -524,19 +550,19 @@ def list_clusters( def update_cluster( self, cluster_name: str, - cluster: Union[Dict, Cluster], - update_mask: Union[Dict, FieldMask], + cluster: dict | Cluster, + update_mask: dict | FieldMask, project_id: str, - region: str = None, - location: Optional[str] = None, - graceful_decommission_timeout: Optional[Union[Dict, Duration]] = None, - request_id: Optional[str] = None, - retry: Optional[Retry] = None, - timeout: Optional[float] = None, - metadata: Optional[Sequence[Tuple[str, str]]] = None, + region: str | None = None, + location: str | None = None, + graceful_decommission_timeout: dict | Duration | None = None, + request_id: str | None = None, + retry: Retry | None = None, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] | None = None, ): """ - Updates a cluster in a project. + Update a cluster in a project. :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. :type project_id: str @@ -609,13 +635,13 @@ def update_cluster( client = self.get_cluster_client(region=region) operation = client.update_cluster( request={ - 'project_id': project_id, - 'region': region, - 'cluster_name': cluster_name, - 'cluster': cluster, - 'update_mask': update_mask, - 'graceful_decommission_timeout': graceful_decommission_timeout, - 'request_id': request_id, + "project_id": project_id, + "region": region, + "cluster_name": cluster_name, + "cluster": cluster, + "update_mask": update_mask, + "graceful_decommission_timeout": graceful_decommission_timeout, + "request_id": request_id, }, retry=retry, timeout=timeout, @@ -626,16 +652,16 @@ def update_cluster( @GoogleBaseHook.fallback_to_default_project_id def create_workflow_template( self, - template: Union[Dict, WorkflowTemplate], + template: dict | WorkflowTemplate, project_id: str, - region: str = None, - location: Optional[str] = None, - retry: Optional[Retry] = None, - timeout: Optional[float] = None, - metadata: Optional[Sequence[Tuple[str, str]]] = None, + region: str | None = None, + location: str | None = None, + retry: Retry | None = None, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] | None = None, ) -> WorkflowTemplate: """ - Creates new workflow template. + Create new workflow template. :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. :type project_id: str @@ -668,9 +694,12 @@ def create_workflow_template( raise TypeError("missing 1 required keyword argument: 'region'") metadata = metadata or () client = self.get_template_client(region) - parent = f'projects/{project_id}/regions/{region}' + parent = f"projects/{project_id}/regions/{region}" return client.create_workflow_template( - request={'parent': parent, 'template': template}, retry=retry, timeout=timeout, metadata=metadata + request={"parent": parent, "template": template}, + retry=retry, + timeout=timeout, + metadata=metadata, ) @GoogleBaseHook.fallback_to_default_project_id @@ -678,17 +707,17 @@ def instantiate_workflow_template( self, template_name: str, project_id: str, - region: str = None, - location: Optional[str] = None, - version: Optional[int] = None, - request_id: Optional[str] = None, - parameters: Optional[Dict[str, str]] = None, - retry: Optional[Retry] = None, - timeout: Optional[float] = None, - metadata: Optional[Sequence[Tuple[str, str]]] = None, + region: str | None = None, + location: str | None = None, + version: int | None = None, + request_id: str | None = None, + parameters: dict[str, str] | None = None, + retry: Retry | None = None, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] | None = None, ): # pylint: disable=too-many-arguments """ - Instantiates a template and begins execution. + Instantiate a template and begins execution. :param template_name: Name of template to instantiate. :type template_name: str @@ -733,9 +762,16 @@ def instantiate_workflow_template( raise TypeError("missing 1 required keyword argument: 'region'") metadata = metadata or () client = self.get_template_client(region) - name = f'projects/{project_id}/regions/{region}/workflowTemplates/{template_name}' + name = ( + f"projects/{project_id}/regions/{region}/workflowTemplates/{template_name}" + ) operation = client.instantiate_workflow_template( - request={'name': name, 'version': version, 'request_id': request_id, 'parameters': parameters}, + request={ + "name": name, + "version": version, + "request_id": request_id, + "parameters": parameters, + }, retry=retry, timeout=timeout, metadata=metadata, @@ -745,17 +781,17 @@ def instantiate_workflow_template( @GoogleBaseHook.fallback_to_default_project_id def instantiate_inline_workflow_template( self, - template: Union[Dict, WorkflowTemplate], + template: dict | WorkflowTemplate, project_id: str, - region: str = None, - location: Optional[str] = None, - request_id: Optional[str] = None, - retry: Optional[Retry] = None, - timeout: Optional[float] = None, - metadata: Optional[Sequence[Tuple[str, str]]] = None, + region: str | None = None, + location: str | None = None, + request_id: str | None = None, + retry: Retry | None = None, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] | None = None, ): """ - Instantiates a template and begins execution. + Instantiate a template and begins execution. :param template: The workflow template to instantiate. If a dict is provided, it must be of the same form as the protobuf message WorkflowTemplate @@ -792,9 +828,9 @@ def instantiate_inline_workflow_template( raise TypeError("missing 1 required keyword argument: 'region'") metadata = metadata or () client = self.get_template_client(region) - parent = f'projects/{project_id}/regions/{region}' + parent = f"projects/{project_id}/regions/{region}" operation = client.instantiate_inline_workflow_template( - request={'parent': parent, 'template': template, 'request_id': request_id}, + request={"parent": parent, "template": template, "request_id": request_id}, retry=retry, timeout=timeout, metadata=metadata, @@ -807,12 +843,12 @@ def wait_for_job( job_id: str, project_id: str, wait_time: int = 10, - region: str = None, - location: Optional[str] = None, - timeout: Optional[int] = None, + region: str | None = None, + location: str | None = None, + timeout: int | None = None, ) -> None: """ - Helper method which polls a job to check if it finishes. + Poll a job to check if it finishes. :param job_id: Id of the Dataproc job :type job_id: str @@ -840,34 +876,43 @@ def wait_for_job( raise TypeError("missing 1 required keyword argument: 'region'") state = None start = time.monotonic() - while state not in (JobStatus.State.ERROR, JobStatus.State.DONE, JobStatus.State.CANCELLED): + while state not in ( + JobStatus.State.ERROR, + JobStatus.State.DONE, + JobStatus.State.CANCELLED, + ): if timeout and start + timeout < time.monotonic(): - raise AirflowException(f"Timeout: dataproc job {job_id} is not ready after {timeout}s") + raise AirflowException( + f"Timeout: dataproc job {job_id} is not ready after {timeout}s" + ) time.sleep(wait_time) try: job = self.get_job(project_id=project_id, region=region, job_id=job_id) state = job.status.state except ServerError as err: - self.log.info("Retrying. Dataproc API returned server error when waiting for job: %s", err) + self.log.info( + "Retrying. Dataproc API returned server error when waiting for job: %s", + err, + ) if state == JobStatus.State.ERROR: - raise AirflowException(f'Job failed:\n{job}') + raise AirflowException(f"Job failed:\n{job}") if state == JobStatus.State.CANCELLED: - raise AirflowException(f'Job was cancelled:\n{job}') + raise AirflowException(f"Job was cancelled:\n{job}") @GoogleBaseHook.fallback_to_default_project_id def get_job( self, job_id: str, project_id: str, - region: str = None, - location: Optional[str] = None, - retry: Optional[Retry] = None, - timeout: Optional[float] = None, - metadata: Optional[Sequence[Tuple[str, str]]] = None, + region: str | None = None, + location: str | None = None, + retry: Retry | None = None, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] | None = None, ) -> Job: """ - Gets the resource representation for a job in a project. + Get the resource representation for a job in a project. :param job_id: Id of the Dataproc job :type job_id: str @@ -899,7 +944,7 @@ def get_job( raise TypeError("missing 1 required keyword argument: 'region'") client = self.get_job_client(region=region) job = client.get_job( - request={'project_id': project_id, 'region': region, 'job_id': job_id}, + request={"project_id": project_id, "region": region, "job_id": job_id}, retry=retry, timeout=timeout, metadata=metadata, @@ -909,17 +954,17 @@ def get_job( @GoogleBaseHook.fallback_to_default_project_id def submit_job( self, - job: Union[dict, Job], + job: dict | Job, project_id: str, - region: str = None, - location: Optional[str] = None, - request_id: Optional[str] = None, - retry: Optional[Retry] = None, - timeout: Optional[float] = None, - metadata: Optional[Sequence[Tuple[str, str]]] = None, + region: str | None = None, + location: str | None = None, + request_id: str | None = None, + retry: Retry | None = None, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] | None = None, ) -> Job: """ - Submits a job to a cluster. + Submit a job to a cluster. :param job: The job resource. If a dict is provided, it must be of the same form as the protobuf message Job @@ -956,7 +1001,12 @@ def submit_job( raise TypeError("missing 1 required keyword argument: 'region'") client = self.get_job_client(region=region) return client.submit_job( - request={'project_id': project_id, 'region': region, 'job': job, 'request_id': request_id}, + request={ + "project_id": project_id, + "region": region, + "job": job, + "request_id": request_id, + }, retry=retry, timeout=timeout, metadata=metadata, @@ -966,11 +1016,11 @@ def submit( self, project_id: str, job: dict, - region: str = 'global', - job_error_states: Optional[Iterable[str]] = None, + region: str = "global", + job_error_states: Iterable[str] | None = None, ) -> None: """ - Submits Google Cloud Dataproc job. + Submit Google Cloud Dataproc job. :param project_id: The id of Google Cloud Dataproc project. :type project_id: str @@ -982,7 +1032,11 @@ def submit( :type job_error_states: List[str] """ # TODO: Remover one day - warnings.warn("This method is deprecated. Please use `submit_job`", DeprecationWarning, stacklevel=2) + warnings.warn( + "This method is deprecated. Please use `submit_job`", + DeprecationWarning, + stacklevel=2, + ) job_object = self.submit_job(region=region, project_id=project_id, job=job) job_id = job_object.reference.job_id self.wait_for_job(job_id=job_id, region=region, project_id=project_id) @@ -992,14 +1046,14 @@ def cancel_job( self, job_id: str, project_id: str, - region: Optional[str] = None, - location: Optional[str] = None, - retry: Optional[Retry] = None, - timeout: Optional[float] = None, - metadata: Optional[Sequence[Tuple[str, str]]] = None, + region: str | None = None, + location: str | None = None, + retry: Retry | None = None, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] | None = None, ) -> Job: """ - Starts a job cancellation request. + Start a job cancellation request. :param project_id: Required. The ID of the Google Cloud project that the job belongs to. :type project_id: str @@ -1018,15 +1072,14 @@ def cancel_job( :param metadata: Additional metadata that is provided to the method. :type metadata: Sequence[Tuple[str, str]] """ - if region is None: - if location is not None: - warnings.warn( - "Parameter `location` will be deprecated. " - "Please provide value through `region` parameter instead.", - DeprecationWarning, - stacklevel=1, - ) - region = location + if region is None and location is not None: + warnings.warn( + "Parameter `location` will be deprecated. " + "Please provide value through `region` parameter instead.", + DeprecationWarning, + stacklevel=1, + ) + region = location if region is None: warnings.warn( @@ -1034,14 +1087,13 @@ def cancel_job( DeprecationWarning, stacklevel=2, ) - region = 'global' + region = "global" client = self.get_job_client(region=region) job = client.cancel_job( - request={'project_id': project_id, 'region': region, 'job_id': job_id}, + request={"project_id": project_id, "region": region, "job_id": job_id}, retry=retry, timeout=timeout, metadata=metadata, ) return job - diff --git a/dags/utils/slack.py b/utils/slack.py similarity index 72% rename from dags/utils/slack.py rename to utils/slack.py index 690dbf31b..4655a0f6c 100644 --- a/dags/utils/slack.py +++ b/utils/slack.py @@ -3,9 +3,10 @@ SLACK_CHANNEL = "#airflow-alerts" + def if_task_fails_alert_slack(context): failed_alert = SlackAPIPostOperator( - task_id='slack_failed', + task_id="slack_failed", channel=SLACK_CHANNEL, token=Variable.get("slack_secret_token"), text=""" @@ -14,9 +15,9 @@ def if_task_fails_alert_slack(context): *Dag*: {dag} *Date*: {ds} """.format( - task=context.get('task_instance').task_id, - dag=context.get('task_instance').dag_id, - ds=context.get('ds') - ) + task=context.get("task_instance").task_id, + dag=context.get("task_instance").dag_id, + ds=context.get("ds"), + ), ) return failed_alert.execute(context=context) diff --git a/dags/utils/tags.py b/utils/tags.py similarity index 85% rename from dags/utils/tags.py rename to utils/tags.py index 69a153961..20bf39896 100644 --- a/dags/utils/tags.py +++ b/utils/tags.py @@ -16,6 +16,10 @@ def __getattr__(self, item: str) -> str: Instead of Tag.ImpactTier.value.tier_1.value we can just use Tag.ImpactTier.tier_1. + Simplify accessing enum values. + + Instead of Tag.ImpactTier.value.tier_1.value we can just use + Tag.ImpactTier.tier_1. # source: https://newbedev.com/enum-of-enums-in-python """ @@ -27,6 +31,7 @@ def __getattr__(self, item: str) -> str: ret_val = getattr(self.value, item).value except AttributeError as _err: raise InvalidTagError(_err) from None + raise InvalidTagError() from _err return ret_val