Skip to content

Commit

Permalink
attempt to generalize the collection projection
Browse files Browse the repository at this point in the history
  • Loading branch information
matteius committed Oct 2, 2024
1 parent 8e9f278 commit d93a9e1
Showing 1 changed file with 35 additions and 10 deletions.
45 changes: 35 additions & 10 deletions opensensor/collection_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,13 @@ def get_nested_fields(model: Type[BaseModel]):
def create_nested_pipeline(model: Type[BaseModel], prefix=""):
nested_fields = get_nested_fields(model)
pipeline = {}
match_conditions = {}

for field_name, field_type in model.__fields__.items():
if field_name == "timestamp":
pipeline[field_name] = f"${prefix}timestamp"
continue

lookup_field = (
model.collection_name() if hasattr(model, "collection_name") else model.__name__
)
Expand All @@ -212,21 +217,36 @@ def create_nested_pipeline(model: Type[BaseModel], prefix=""):

if field_name in nested_fields:
if get_origin(field_type.type_) is List:
nested_pipeline, nested_match = create_nested_pipeline(
nested_fields[field_name], "$$item."
)
pipeline[field_name] = {
"$map": {
"input": f"${full_field_name}",
"input": {
"$filter": {
"input": f"${full_field_name}",
"as": "item",
"cond": nested_match,
}
},
"as": "item",
"in": create_nested_pipeline(nested_fields[field_name], "$$item."),
"in": nested_pipeline,
}
}
match_conditions[full_field_name] = {"$exists": True, "$ne": []}
else:
pipeline[field_name] = create_nested_pipeline(
nested_pipeline, nested_match = create_nested_pipeline(
nested_fields[field_name], f"{full_field_name}."
)
pipeline[field_name] = nested_pipeline
match_conditions.update(
{f"{full_field_name}.{k}": v for k, v in nested_match.items()}
)
else:
pipeline[field_name] = f"${full_field_name}"
match_conditions[full_field_name] = {"$exists": True}

return pipeline
return pipeline, match_conditions


def create_model_instance(model: Type[BaseModel], data: dict):
Expand Down Expand Up @@ -360,16 +380,21 @@ def get_uniform_sample_pipeline(
if end_date is None:
end_date = datetime.utcnow()
sampling_interval = timedelta(minutes=resolution)
match_clause = get_initial_match_clause(device_ids, device_name, start_date, end_date)

# Create a generalized project pipeline
project_pipeline = create_nested_pipeline(response_model)
project_pipeline["timestamp"] = "$timestamp"
# Create a generalized project pipeline and match conditions
project_pipeline, match_conditions = create_nested_pipeline(response_model)

logger.info(f"Project pipeline for {response_model.__name__}: {project_pipeline}")
# Add timestamp and device metadata conditions to match_conditions
match_conditions.update(
{
"timestamp": {"$gte": start_date, "$lte": end_date},
"metadata.device_id": {"$in": device_ids},
"metadata.name": device_name,
}
)

pipeline = [
{"$match": match_clause},
{"$match": match_conditions},
{
"$addFields": {
"group": {
Expand Down

0 comments on commit d93a9e1

Please sign in to comment.