Skip to content

Commit

Permalink
feat(DENG-2682): Update fivetran imports to use airflow-provider-five…
Browse files Browse the repository at this point in the history
…tran-async Airflow Fivetran provider (#1914)

* update fivetran imports to use airflow-provider-fivetran-async

* black and ruff errors fixed
  • Loading branch information
kik-kik authored Feb 21, 2024
1 parent 68bfbe9 commit ec271e2
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 13 deletions.
18 changes: 9 additions & 9 deletions dags/casa.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from datetime import datetime, timedelta

from airflow import DAG
from fivetran_provider.operators.fivetran import FivetranOperator
from fivetran_provider.sensors.fivetran import FivetranSensor
from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.sensors import FivetranSensor

from utils.callbacks import retry_tasks_callback
from utils.tags import Tag

Expand Down Expand Up @@ -33,25 +34,24 @@
tags = [Tag.ImpactTier.tier_1]

with DAG(
'fivetran_casa',
"fivetran_casa",
default_args=default_args,
doc_md=DOCS,
schedule_interval="0 5 * * *",
tags=tags,
) as dag:

casa_sync_start = FivetranOperator(
connector_id='{{ var.value.fivetran_casa_connector_id }}',
task_id='casa-task',
connector_id="{{ var.value.fivetran_casa_connector_id }}",
task_id="casa-task",
)

casa_sync_wait = FivetranSensor(
connector_id='{{ var.value.fivetran_casa_connector_id }}',
task_id='casa-sensor',
connector_id="{{ var.value.fivetran_casa_connector_id }}",
task_id="casa-sensor",
poke_interval=30,
xcom="{{ task_instance.xcom_pull('casa-task') }}",
on_retry_callback=retry_tasks_callback,
params={'retry_tasks': ['casa-task']},
params={"retry_tasks": ["casa-task"]},
)

casa_sync_start >> casa_sync_wait
4 changes: 2 additions & 2 deletions dags/fivetran_acoustic.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from airflow.hooks.base import BaseHook
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from fivetran_provider.operators.fivetran import FivetranOperator
from fivetran_provider.sensors.fivetran import FivetranSensor
from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.sensors import FivetranSensor

from utils.acoustic.acoustic_client import AcousticClient
from utils.callbacks import retry_tasks_callback
Expand Down
2 changes: 1 addition & 1 deletion requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ apache-airflow[amazon,async,celery,cncf.kubernetes,github_enterprise,google_auth
apache-airflow-providers-google
apache-airflow-providers-http
apache-airflow-providers-slack
airflow-provider-fivetran==1.1.2
airflow-provider-fivetran-async==2.0.2

# Code quality
pytest==7.4.3
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
aiofiles==23.2.1
aiohttp==3.8.6
aiosignal==1.3.1
airflow-provider-fivetran==1.1.2
airflow-provider-fivetran-async==2.0.2
alembic==1.12.1
amqp==5.1.1
annotated-types==0.6.0
Expand Down

0 comments on commit ec271e2

Please sign in to comment.