Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fabric E2E Sample] Added updates on notebook and pipeline #959

Open
wants to merge 20 commits into
base: feat/e2e-fabric-dataops-sample-v0-2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
887e5ac
Added updates on notebook and pipeline
camaderal Dec 16, 2024
d1b3ab2
rename the standardize validation root folder
maye-msft Dec 17, 2024
077918b
Merge branch 'feat/e2e-fabric-dataops-sample-v0-2' into kitsune/noteb…
camaderal Dec 17, 2024
2d30d7c
Removed rest connection id
camaderal Dec 17, 2024
b8d084d
Removed changes to unneeded files
camaderal Dec 17, 2024
3a26318
Update 03_transform.ipynb to insert dim tables directly instead of us…
yuna-s Dec 17, 2024
6498bee
Linting and added download data to notebook
camaderal Dec 17, 2024
efe2e10
Merge branch 'kitsune/notebook_and_pipeline_updates' of https://githu…
camaderal Dec 17, 2024
6bcb4d4
Fixed linting errors
camaderal Dec 18, 2024
11d5a14
Fixed linting errors
camaderal Dec 18, 2024
90e8eb9
Fixed some issues with logging
camaderal Dec 19, 2024
6e25dee
Merge branch 'feat/e2e-fabric-dataops-sample-v0-2' into kitsune/noteb…
camaderal Dec 19, 2024
f169068
Merge remote-tracking branch 'origin/feat/e2e-fabric-dataops-sample-v…
promisinganuj Dec 23, 2024
5e2cecd
Merge remote-tracking branch 'origin/feat/e2e-fabric-dataops-sample-v…
promisinganuj Dec 24, 2024
506f625
add ddo_transform upload and update tests
yuna-s Dec 25, 2024
074405c
notebook updates
yuna-s Dec 25, 2024
25a132f
Linting update
yuna-s Dec 26, 2024
7d7e916
Update json format for ddo module test to pass lint
yuna-s Dec 26, 2024
53af889
Merge pull request #994 from Azure-Samples/kitsune/yuna/upload_ddo_tr…
yuna-s Jan 2, 2025
9b20a34
Merge remote-tracking branch 'origin/feat/e2e-fabric-dataops-sample-v…
promisinganuj Jan 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ workspace_name = <workspace-name>
workspace_id = <workspace-id>
lakehouse_name = <lakehouse-name>
lakehouse_id = <lakehouse-id>
landing_directory = ${setup:adls_shortcut_name}/data/lnd/

[keyvault]
name = <keyvault-name>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# -*- coding: utf-8 -*-

"""Main module."""


from typing import Tuple

from pyspark.sql import DataFrame
from pyspark.sql.functions import col, lit, to_timestamp
from pyspark.sql.types import ArrayType, DoubleType, StringType, StructField, StructType, TimestampType # noqa: E501


def get_schema(schema_name: StringType) -> StructType:
if schema_name == "in_parkingbay_schema":
schema = StructType(
[
StructField(
"the_geom",
StructType(
[
StructField("coordinates", ArrayType(ArrayType(ArrayType(ArrayType(DoubleType()))))),
StructField("type", StringType()),
]
),
),
StructField("marker_id", StringType()),
StructField("meter_id", StringType()),
StructField("bay_id", StringType(), False),
StructField("last_edit", StringType()),
StructField("rd_seg_id", StringType()),
StructField("rd_seg_dsc", StringType()),
]
)
elif schema_name == "in_sensordata_schema":
schema = StructType(
[
StructField("bay_id", StringType(), False),
StructField("st_marker_id", StringType()),
StructField("status", StringType()),
StructField(
"location",
StructType(
[StructField("coordinates", ArrayType(DoubleType())), StructField("type", StringType())]
),
),
StructField("lat", StringType()),
StructField("lon", StringType()),
]
)
return schema


def standardize_parking_bay(
parkingbay_sdf: DataFrame, load_id: StringType, loaded_on: TimestampType
) -> Tuple[DataFrame, DataFrame]:
t_parkingbay_sdf = (
parkingbay_sdf.drop_duplicates(["bay_id"])
.withColumn("last_edit", to_timestamp("last_edit", "yyyyMMddHHmmss"))
.select(
col("bay_id").cast("int").alias("bay_id"),
"last_edit",
"marker_id",
"meter_id",
"rd_seg_dsc",
col("rd_seg_id").cast("int").alias("rd_seg_id"),
"the_geom",
lit(load_id).alias("load_id"),
lit(loaded_on.isoformat()).cast("timestamp").alias("loaded_on"),
)
).cache()
# Data Validation
good_records = t_parkingbay_sdf.filter(col("bay_id").isNotNull())
bad_records = t_parkingbay_sdf.filter(col("bay_id").isNull())
return good_records, bad_records


def standardize_sensordata(
sensordata_sdf: DataFrame, load_id: StringType, loaded_on: TimestampType
) -> Tuple[DataFrame, DataFrame]:
t_sensordata_sdf = (
sensordata_sdf.select(
col("bay_id").cast("int").alias("bay_id"),
"st_marker_id",
col("lat").cast("float").alias("lat"),
col("lon").cast("float").alias("lon"),
"location",
"status",
lit(load_id).alias("load_id"),
lit(loaded_on.isoformat()).cast("timestamp").alias("loaded_on"),
)
).cache()
# Data Validation
good_records = t_sensordata_sdf.filter(col("bay_id").isNotNull())
bad_records = t_sensordata_sdf.filter(col("bay_id").isNull())
return good_records, bad_records
Loading
Loading