Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Subscription deletion #136

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

ruokun-niu
Copy link
Contributor

Description

Currently, when a continuous query is deleted, the subscription info in the change router is still persisted. This PR focuses on the deletion of subscription when a continuous query is deleted. Specifically, this is achieved by the following steps:

  • When a DELETE command is received in the query worker, the worker will iterate through all of the sources that the query is subscribed to, and then invoke the unsubscription method in the sources' query-api (this is achieved through dapr service invocation).
  • When the unsubscription method is invoked, the query api will publish the following message to the Source's change router:
{"payload":{"queryId":"<query-id>","queryNodeId":"<query-node-id>","source":{"db":"Drasi","table":"SourceUnsubscription"}}}

This message has a similar format as the SourceSubscription message

  • When the change router receives this message, it will format the state entry key and delete it from the dapr state store

Type of change

  • This pull request fixes a bug in Drasi and has an approved issue (issue link required).

Fixes: #issue_number

query-container/query-host/src/source_client.rs Dismissed Show dismissed Hide dismissed
key: &str,
metadata: Option<HashMap<String, String>>,
) -> Result<(), Box<dyn std::error::Error>> {
let addr = "https://127.0.0.1".to_string();

Check notice

Code scanning / devskim

Accessing localhost could indicate debug code, or could hinder scaling. Note

Do not leave debug code in production

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 6 out of 8 changed files in this pull request and generated no comments.

Files not reviewed (2)
  • query-container/query-host/src/query_worker.rs: Evaluated as low risk
  • sources/shared/query-api/src/api.rs: Evaluated as low risk
Comments suppressed due to low confidence (1)

query-container/query-host/src/models.rs:178

  • The error message 'Failed to unsubscribe' could be more descriptive. Consider including the query ID or subscription ID in the message.
UnsubscribeFailed(String),

let resp = match self
.client
.post(format!("http://{}/unsubscription", app_id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To remain "RESTful", I think this should be a DELETE verb on the subscription/{queryNodeId}/{queryId} route.

state: &Arc<AppState>,
) -> Result<(), axum::http::Response<axum::body::Body>> {
let publisher = &state.publisher;
let unsubscription_event = api::UnsubscriptionEvent {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this just be a ControlEvent with an op of d instead of i on the SourceSubscription table?

@@ -391,6 +391,31 @@ async fn process_changes(
} else {
// TODO - supprt other ops on SourceSubscriptions
}
} else if change["payload"]["source"]["table"] == "SourceUnsubscription" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we refactor these deep nested if else statements into a match statement?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants