Skip to content

Commit

Permalink
normalization cross-date bug fixes (#30)
Browse files Browse the repository at this point in the history
* normalization cross-date bug fixes

* normalization clean up logging

* flake8
  • Loading branch information
geeli123 authored Nov 20, 2024
1 parent d9849ee commit 73eacd2
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 20 deletions.
4 changes: 2 additions & 2 deletions src/normalize/norm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@


def check_config(config: dict):
print(config)
assert config["s3_bucket"]["uri"]
assert config["s3_bucket"]["public_key"]
assert config["s3_bucket"]["secret_key"]
Expand All @@ -23,8 +22,9 @@ def load_config(path: str):
def get_last_processed_timestamp(s3, bucket, key):
try:
response = s3.get_object(Bucket=bucket, Key=key)
last_processed_epoch_timestamp = float(json.loads(response["Body"].read())["last_processed"])
return dt.datetime.fromtimestamp(
float(json.loads(response["Body"].read())["last_processed"])
last_processed_epoch_timestamp, dt.timezone.utc
)
except s3.exceptions.NoSuchKey:
return None
Expand Down
54 changes: 36 additions & 18 deletions src/normalize/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,19 @@ def parse_files(s3, s3_fs, source_prefix, destination_prefix, start_date, state_
if last_processed:
LOGGER.info(f"Loaded last_processed timestamp of {last_processed}")
else:
last_processed = pytz.UTC.localize(parser.parse(start_date))
LOGGER.info(
f"No state information found at {state_file},"
f" defaulting `last_processed={start_date}"
f" defaulting `last_processed={last_processed}"
)
last_processed = parser.parse(start_date)

# List objects in the source bucket
paginator = s3.get_paginator("list_objects_v2")

cur_time = dt.datetime.utcnow().astimezone(pytz.UTC)
cur_processing = last_processed.astimezone(pytz.UTC)
cur_time = pytz.UTC.localize(dt.datetime.utcnow())
cur_processing = last_processed

global_data_written = False
while cur_processing <= cur_time:
date_partition = os.path.join(
source_key,
Expand All @@ -112,7 +114,7 @@ def parse_files(s3, s3_fs, source_prefix, destination_prefix, start_date, state_
LOGGER.info(f"Processing date: {date_partition}")
max_epoch_timestamp = cur_processing.timestamp()

for page in paginator.paginate(Bucket=source_bucket, Prefix=date_partition, PaginationConfig={'PageSize': 60}):
for page in paginator.paginate(Bucket=source_bucket, Prefix=date_partition, PaginationConfig={'PageSize': 30}):
trip_updates_pa = None
vehicles_pa = None
alerts_pa = None
Expand Down Expand Up @@ -154,32 +156,48 @@ def parse_files(s3, s3_fs, source_prefix, destination_prefix, start_date, state_

max_epoch_timestamp = max(max_epoch_timestamp, file_write_epoch_time)

new_data_written = False
if trip_updates_pa:
s3_uri = f"{destination_prefix}/trip-updates"
LOGGER.info(f"Writing {trip_updates_pa.num_rows} entries to {s3_uri}")
time_range = pa.compute.min_max(trip_updates_pa['time'])
LOGGER.info(
f"Writing {trip_updates_pa.num_rows} entries to {s3_uri}. "
f"Min timestamp {time_range['min']}, max timestamp {time_range['max']}"
)
write_data(s3_fs, trip_updates_pa, s3_uri)
new_data_written = True
if vehicles_pa:
s3_uri = f"{destination_prefix}/vehicles"
LOGGER.info(f"Writing {vehicles_pa.num_rows} entries to {s3_uri}")
time_range = pa.compute.min_max(vehicles_pa['time'])
LOGGER.info(
f"Writing {vehicles_pa.num_rows} entries to {s3_uri}. "
f"Min timestamp {time_range['min']}, max timestamp {time_range['max']}"
)
write_data(s3_fs, vehicles_pa, s3_uri)
new_data_written = True
if alerts_pa:
s3_uri = f"{destination_prefix}/alerts"
LOGGER.info(f"Writing {alerts_pa.num_rows} entries to {s3_uri}")
time_range = pa.compute.min_max(alerts_pa['time'])
LOGGER.info(
f"Writing {alerts_pa.num_rows} entries to {s3_uri}. "
f"Min timestamp {time_range['min']}, max timestamp {time_range['max']}"
)
write_data(s3_fs, alerts_pa, s3_uri)

# Update the last processed timestamp
if max_epoch_timestamp == last_processed:
LOGGER.warning(
f"No data found in partition: {date_partition} "
f"- is this expected?"
new_data_written = True
if new_data_written:
global_data_written = True
LOGGER.info(
f"Updating last processed timestamp to "
f"maximum file timestamp: {dt.datetime.utcfromtimestamp(max_epoch_timestamp).isoformat()}"
)
LOGGER.info(
f"Updating last processed timestamp to "
f"maximum file timestamp: {dt.datetime.utcfromtimestamp(max_epoch_timestamp).isoformat()}"
)
update_last_processed_timestamp(s3, state_bucket, state_key, max_epoch_timestamp)
cur_processing = (cur_processing + dt.timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)

if not global_data_written:
LOGGER.warning(
"No new data written - is this expected?"
)


@click.command()
@click.option("-f", "--feed_id", required=True, type=str, help="feed ID to be scraped")
Expand Down

0 comments on commit 73eacd2

Please sign in to comment.