-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlinear_reg_func.py
61 lines (49 loc) · 2.87 KB
/
linear_reg_func.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import warnings
warnings.filterwarnings(action="ignore")
def linear_reg_func(granularity): # "asa" or "vendor"
# Local imports
from google.cloud import bigquery
from google.cloud import bigquery_storage
import pandas as pd
# Instantiate the BQ variables to read and write to GCP
client = bigquery.Client(project="logistics-data-staging-flat")
bqstorage_client = bigquery_storage.BigQueryReadClient()
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
# Define a function that fits a linear line through the CVR points
def model(df, cvr_col):
from sklearn.linear_model import LinearRegression # Local import to speed up importing
import numpy as np
data_x = df[["df_total"]].values
data_y = df[[cvr_col]].values
lm = LinearRegression()
lm.fit(X=data_x, y=data_y)
return float(np.squeeze(lm.coef_))
if granularity == "asa":
# Download the datasets
data_query = """SELECT * FROM `dh-logistics-product-ops.pricing.cvr_per_df_bucket_asa_level_loved_brands_scaled_code`"""
df = client.query(query=data_query).result().to_dataframe(bqstorage_client=bqstorage_client)
# Get the slopes
df_slopes = df[df["num_tiers_master_asa"] > 1].groupby(["entity_id", "country_code", "master_asa_id"]).apply(model, cvr_col = "asa_cvr3_per_df").to_frame(name="asa_cvr3_slope")
# Join the results to the original data frame
df_merged = pd.merge(left=df, right=df_slopes, on=["entity_id", "country_code", "master_asa_id"], how="left")
# Destination table name
destination_tbl = "cvr_per_df_bucket_asa_level_loved_brands_scaled_code"
elif granularity == "vendor":
# Download the datasets
data_query = """SELECT * FROM `dh-logistics-product-ops.pricing.cvr_per_df_bucket_vendor_level_loved_brands_scaled_code`"""
df = client.query(query=data_query).result().to_dataframe(bqstorage_client=bqstorage_client)
# Get the slopes
df_slopes = df[df["num_tiers_vendor"] > 1].groupby(["entity_id", "country_code", "master_asa_id", "vendor_code"]).apply(model, cvr_col = "cvr3").to_frame(name="vendor_cvr3_slope")
# Join the results to the original data frame
df_merged = pd.merge(left=df, right=df_slopes, on=["entity_id", "country_code", "master_asa_id", "vendor_code"], how="left")
# Destination table name
destination_tbl = "cvr_per_df_bucket_vendor_level_loved_brands_scaled_code"
# Upload the df_vendor frame to BQ
client.load_table_from_dataframe(
dataframe=df_merged.reset_index(drop=True),
destination=f"dh-logistics-product-ops.pricing.{destination_tbl}",
job_config=job_config
).result()
linear_reg_func(granularity="vendor")
linear_reg_func(granularity="asa")