Skip to content

Commit

Permalink
docs: GCS factory assets (#1732)
Browse files Browse the repository at this point in the history
* Added a guide for adding GCS-based assets
* Added partial comments for how to configure the factory
* Removing a.out and boop.txt
  • Loading branch information
ryscheng authored Jun 30, 2024
1 parent ec3131c commit dd51851
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 63 deletions.
Empty file removed a.out
Empty file.
13 changes: 8 additions & 5 deletions apps/docs/docs/contribute/connect-data/dagster-config.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ sidebar_class_name: hidden
When you are ready to deploy,
submit a pull request of your changes to
[OSO](https://github.com/opensource-observer/oso).
OSO maintainers will work with you to get the code In shape for merging.
OSO maintainers will work with you to get the code in shape for merging.
For more details on contributing to OSO, check out
[CONTRIBUTING.md](https://github.com/opensource-observer/oso/blob/main/CONTRIBUTING.md).

### Verify deployment

Give our Dagster deployment 24 hours after your pull request merges to reload.
Our Dagster deployment should automatically recognize the asset
after merging your pull request to the main branch.
You should be able to find your new asset
in the [global asset list](https://dagster.opensource.observer/assets).

Expand All @@ -30,6 +31,7 @@ it may look like this:

![Dagster deployment](./dagster_deployments.png)


### Run it!

If this is your first time adding an asset,
Expand Down Expand Up @@ -62,7 +64,9 @@ increased infrastructure instability or unexpected costs.

## Defining a dbt source

In this example, we create a dbt source in `oso/warehouse/dbt/models/`
In order to make the new dataset available to the data pipeline,
you need to add it as a dbt source.
In this example, we create a source in `oso/warehouse/dbt/models/`
(see [source](https://github.com/opensource-observer/oso/blob/main/warehouse/dbt/models/ethereum_sources.yml))
for the Ethereum mainnet
[public dataset](https://cloud.google.com/blog/products/data-analytics/ethereum-bigquery-public-dataset-smart-contract-analytics).
Expand All @@ -79,7 +83,6 @@ sources:
identifier: traces
```
This will make it clear to OSO dbt users that the data is available for use.
We can then reference these tables in downstream models with
the `source` macro:

Expand All @@ -96,4 +99,4 @@ from {{ source("ethereum", "transactions") }}

- [**SQL Query Guide**](../../integrate/query-data.mdx): run queries against the data you just loaded
- [**Connect OSO to 3rd Party tools**](../../integrate/3rd-party.mdx): explore your data using tools like Hex.tech, Tableau, and Metabase
- [**Write a dbt model**](../impact-models.md): contribute new impact and data models to our data pipeline
- [**Write a dbt model**](../impact-models.md): contribute new impact and data models to our data pipeline
7 changes: 4 additions & 3 deletions apps/docs/docs/contribute/connect-data/dagster.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ At a high-level, there are 2 possible pathways:
easily fit in memory.

Assets should be added to
`warehouse/oso_dagster/assets/`.
All assets defined in this directory are automatically
loaded into Dagster.
`warehouse/oso_dagster/assets/` and then imported in
`warehouse/oso_dagster/assets/__init__.py`.
All assets defined in this module are automatically
loaded into Dagster from the main branch of the git repository.

For an example of a custom Dagster asset, check out the
[asset for oss-directory](https://github.com/opensource-observer/oso/blob/main/warehouse/oso_dagster/assets/ossd.py),
Expand Down
80 changes: 67 additions & 13 deletions apps/docs/docs/contribute/connect-data/gcs.md
Original file line number Diff line number Diff line change
@@ -1,38 +1,92 @@
---
title: 🏗️ Connect via Google Cloud Storage (GCS)
title: Connect via Google Cloud Storage (GCS)
sidebar_position: 4
---

import NextSteps from "./dagster-config.mdx"

Depending on the data, we may accept data dumps
into our Google Cloud Storage (GCS).
We strongly prefer data partners that can provide
updated live datasets, over a static snapshot.
Datasets that use this method will require OSO sponsorship
for the storing the data, because we take on the costs
of converting it into a BigQuery dataset
and associated long-term storage costs.
If you believe your data storage qualifies to be sponsored
by OSO, please reach out to us on
[Discord](https://www.opensource.observer/discord).

## Get write access
If you prefer to handle the data storage yourself, check out the
[Connect via BigQuery guide](./bigquery/index.md).

Coordinate with the OSO engineering team directly on
## Schedule periodic dumps to GCS

First, you can coordinate with the OSO engineering team directly on
[Discord](https://www.opensource.observer/discord)
to give your Google service account write permissions to
our GCS bucket.

With these access permissions, you should schedule a
cron job to regularly dump new time-partitioned data,
usually in daily or weekly jobs.

## Defining a Dagster Asset

:::warning
Coming soon... This section is a work in progress
and will be likely refactored soon.
:::
Next, create a new asset file in
`warehouse/oso_dagster/assets/`.
This file should invoke the GCS asset factory.
For example, you can see this in action for
[Gitcoin passport scores](https://github.com/opensource-observer/oso/blob/main/warehouse/oso_dagster/assets/gitcoin.py):

```python
# warehouse/oso_dagster/assets/gitcoin.py
from ..factories import (
interval_gcs_import_asset,
SourceMode,
Interval,
IntervalGCSAsset,
)

gitcoin_passport_scores = interval_gcs_import_asset(
IntervalGCSAsset(
key_prefix="gitcoin",
name="passport_scores",
project_id="opensource-observer",
bucket_name="oso-dataset-transfer-bucket",
path_base="passport",
file_match=r"(?P<interval_timestamp>\d\d\d\d-\d\d-\d\d)/scores.parquet",
destination_table="passport_scores",
raw_dataset_name="oso_raw_sources",
clean_dataset_name="gitcoin",
interval=Interval.Daily,
mode=SourceMode.Overwrite,
retention_days=10,
format="PARQUET",
),
)
```

To see an example of this in action,
you can look into our Dagster asset for
[Gitcoin passport scores](https://github.com/opensource-observer/oso/blob/main/warehouse/oso_dagster/assets.py).
For the latest documentation on configuration parameters,
check out the comments in the
[GCS factory](https://github.com/opensource-observer/oso/blob/main/warehouse/oso_dagster/factories/gcs.py).

In order for our Dagster deployment to recognize this asset,
you need to import it in
`warehouse/oso_dagster/assets/__init__.py`.

```python
from .dbt import *
from .gitcoin import *
...
```

For more details on defining Dagster assets,
see the [Dagster tutorial](https://docs.dagster.io/tutorial).

## GCS import examples in OSO
### GCS examples in OSO

In the
[OSO monorepo](https://github.com/opensource-observer/oso),
you will find a few examples of using the GCS asset factory:

- [Superchain data](https://github.com/opensource-observer/oso/blob/main/warehouse/oso_dagster/assets.py)
- [Gitcoin Passport scores](https://github.com/opensource-observer/oso/blob/main/warehouse/oso_dagster/assets.py)
Expand Down
40 changes: 0 additions & 40 deletions boop.txt

This file was deleted.

16 changes: 14 additions & 2 deletions warehouse/oso_dagster/factories/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,38 +25,50 @@
from .common import AssetFactoryResponse
from ..utils.bq import ensure_dataset, DatasetOptions


# An enum for specifying time intervals
class Interval(Enum):
Hourly = 0
Daily = 1
Weekly = 2
Monthly = 3


# Configures how we should handle incoming data
class SourceMode(Enum):
# Add new time-partitioned data incrementally
Incremental = 0
# Overwrite the entire dataset on each import
Overwrite = 1


@dataclass(kw_only=True)
class BaseGCSAsset:
name: str
key_prefix: Optional[str | Sequence[str]] = ""
# GCP project ID (usually opensource-observer)
project_id: str
# GCS bucket name
bucket_name: str
path_base: str
# Regex for incoming files
file_match: str
# Table name nested under the dataset
destination_table: str
# BigQuery temporary staging dataset for imports
raw_dataset_name: str
# BigQuery destination dataset
clean_dataset_name: str
# Format of incoming files (PARQUET preferred)
format: str = "CSV"
asset_kwargs: dict = field(default_factory=lambda: {})


@dataclass(kw_only=True)
class IntervalGCSAsset(BaseGCSAsset):
# How often we should run this job
interval: Interval
# Incremental or overwrite
mode: SourceMode
# Retention time before deleting GCS files
retention_days: int


Expand Down

0 comments on commit dd51851

Please sign in to comment.