Skip to content

Commit

Permalink
Various Goldsky Asset Factory Fixes (#1755)
Browse files Browse the repository at this point in the history
Fix asset factory annotations and checks
  • Loading branch information
ravenac95 authored Jul 3, 2024
1 parent 8a40a54 commit b3b2bd0
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 8 deletions.
12 changes: 5 additions & 7 deletions warehouse/oso_dagster/factories/goldsky/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
TableRecord,
MetadataValue,
TableColumn,
TableSchema,
TableSchema
)

from dagster_gcp import BigQueryResource, GCSResource
Expand Down Expand Up @@ -1019,7 +1019,7 @@ def generated_asset(
bigquery: BigQueryResource,
gcs: GCSResource,
cbt: CBTResource,
):
) -> None:
context.log.info(f"Run ID: {context.run_id} AssetKey: {context.asset_key}")
materialize_asset(context, bigquery, gcs, cbt)

Expand All @@ -1032,7 +1032,7 @@ def goldsky_clean_up_op(
gcs: GCSResource,
cbt: CBTResource,
config: dict,
):
) -> None:
print(config)
gs_asset = GoldskyAsset(gcs, bigquery, cbt, asset_config)
gs_asset.clean_up(context.log)
Expand All @@ -1044,7 +1044,7 @@ def goldsky_backfill_op(
gcs: GCSResource,
cbt: CBTResource,
config: dict,
):
) -> None:
start_checkpoint = None
end_checkpoint = None
if "start" in config:
Expand All @@ -1067,8 +1067,6 @@ def goldsky_backfill_op(
),
pointer_table_suffix=op_input.backfill_label,
)
# Hack for now.
return "Done"

@op(name=f"{related_ops_prefix}_files_stats_op")
def goldsky_files_stats_op(
Expand Down Expand Up @@ -1137,7 +1135,7 @@ def goldsky_load_schema_op(
gcs: GCSResource,
cbt: CBTResource,
config: dict
):
) -> None:
table_schema = TableSchema(
columns=[
TableColumn(
Expand Down
3 changes: 2 additions & 1 deletion warehouse/oso_dagster/factories/goldsky/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ def get_range(self) -> Tuple[arrow.Arrow | None, arrow.Arrow | None]:
start = arrow.get(self.start) if self.start is not None else None
end = arrow.get(self.end) if self.end is not None else None
return (start, end)
return (now.shift(days=-6), now.shift(days=-1))
# By default check the last 2 weeks of data
return (now.shift(days=-15), now.shift(days=-1))


def traces_check(
Expand Down

0 comments on commit b3b2bd0

Please sign in to comment.