From 73eacd28b4b48910a4674e1461da65b1524275cf Mon Sep 17 00:00:00 2001 From: George Li Date: Wed, 20 Nov 2024 09:03:16 -0500 Subject: [PATCH] normalization cross-date bug fixes (#30) * normalization cross-date bug fixes * normalization clean up logging * flake8 --- src/normalize/norm_utils.py | 4 +-- src/normalize/normalize.py | 54 ++++++++++++++++++++++++------------- 2 files changed, 38 insertions(+), 20 deletions(-) diff --git a/src/normalize/norm_utils.py b/src/normalize/norm_utils.py index 4412144..0692e1d 100644 --- a/src/normalize/norm_utils.py +++ b/src/normalize/norm_utils.py @@ -4,7 +4,6 @@ def check_config(config: dict): - print(config) assert config["s3_bucket"]["uri"] assert config["s3_bucket"]["public_key"] assert config["s3_bucket"]["secret_key"] @@ -23,8 +22,9 @@ def load_config(path: str): def get_last_processed_timestamp(s3, bucket, key): try: response = s3.get_object(Bucket=bucket, Key=key) + last_processed_epoch_timestamp = float(json.loads(response["Body"].read())["last_processed"]) return dt.datetime.fromtimestamp( - float(json.loads(response["Body"].read())["last_processed"]) + last_processed_epoch_timestamp, dt.timezone.utc ) except s3.exceptions.NoSuchKey: return None diff --git a/src/normalize/normalize.py b/src/normalize/normalize.py index 7ea0b21..02169f5 100644 --- a/src/normalize/normalize.py +++ b/src/normalize/normalize.py @@ -91,17 +91,19 @@ def parse_files(s3, s3_fs, source_prefix, destination_prefix, start_date, state_ if last_processed: LOGGER.info(f"Loaded last_processed timestamp of {last_processed}") else: + last_processed = pytz.UTC.localize(parser.parse(start_date)) LOGGER.info( f"No state information found at {state_file}," - f" defaulting `last_processed={start_date}" + f" defaulting `last_processed={last_processed}" ) - last_processed = parser.parse(start_date) # List objects in the source bucket paginator = s3.get_paginator("list_objects_v2") - cur_time = dt.datetime.utcnow().astimezone(pytz.UTC) - cur_processing = last_processed.astimezone(pytz.UTC) + cur_time = pytz.UTC.localize(dt.datetime.utcnow()) + cur_processing = last_processed + + global_data_written = False while cur_processing <= cur_time: date_partition = os.path.join( source_key, @@ -112,7 +114,7 @@ def parse_files(s3, s3_fs, source_prefix, destination_prefix, start_date, state_ LOGGER.info(f"Processing date: {date_partition}") max_epoch_timestamp = cur_processing.timestamp() - for page in paginator.paginate(Bucket=source_bucket, Prefix=date_partition, PaginationConfig={'PageSize': 60}): + for page in paginator.paginate(Bucket=source_bucket, Prefix=date_partition, PaginationConfig={'PageSize': 30}): trip_updates_pa = None vehicles_pa = None alerts_pa = None @@ -154,32 +156,48 @@ def parse_files(s3, s3_fs, source_prefix, destination_prefix, start_date, state_ max_epoch_timestamp = max(max_epoch_timestamp, file_write_epoch_time) + new_data_written = False if trip_updates_pa: s3_uri = f"{destination_prefix}/trip-updates" - LOGGER.info(f"Writing {trip_updates_pa.num_rows} entries to {s3_uri}") + time_range = pa.compute.min_max(trip_updates_pa['time']) + LOGGER.info( + f"Writing {trip_updates_pa.num_rows} entries to {s3_uri}. " + f"Min timestamp {time_range['min']}, max timestamp {time_range['max']}" + ) write_data(s3_fs, trip_updates_pa, s3_uri) + new_data_written = True if vehicles_pa: s3_uri = f"{destination_prefix}/vehicles" - LOGGER.info(f"Writing {vehicles_pa.num_rows} entries to {s3_uri}") + time_range = pa.compute.min_max(vehicles_pa['time']) + LOGGER.info( + f"Writing {vehicles_pa.num_rows} entries to {s3_uri}. " + f"Min timestamp {time_range['min']}, max timestamp {time_range['max']}" + ) write_data(s3_fs, vehicles_pa, s3_uri) + new_data_written = True if alerts_pa: s3_uri = f"{destination_prefix}/alerts" - LOGGER.info(f"Writing {alerts_pa.num_rows} entries to {s3_uri}") + time_range = pa.compute.min_max(alerts_pa['time']) + LOGGER.info( + f"Writing {alerts_pa.num_rows} entries to {s3_uri}. " + f"Min timestamp {time_range['min']}, max timestamp {time_range['max']}" + ) write_data(s3_fs, alerts_pa, s3_uri) - - # Update the last processed timestamp - if max_epoch_timestamp == last_processed: - LOGGER.warning( - f"No data found in partition: {date_partition} " - f"- is this expected?" + new_data_written = True + if new_data_written: + global_data_written = True + LOGGER.info( + f"Updating last processed timestamp to " + f"maximum file timestamp: {dt.datetime.utcfromtimestamp(max_epoch_timestamp).isoformat()}" ) - LOGGER.info( - f"Updating last processed timestamp to " - f"maximum file timestamp: {dt.datetime.utcfromtimestamp(max_epoch_timestamp).isoformat()}" - ) update_last_processed_timestamp(s3, state_bucket, state_key, max_epoch_timestamp) cur_processing = (cur_processing + dt.timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) + if not global_data_written: + LOGGER.warning( + "No new data written - is this expected?" + ) + @click.command() @click.option("-f", "--feed_id", required=True, type=str, help="feed ID to be scraped")