Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Subscription deletion #136

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions query-container/query-host/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ pub enum BootstrapError {
Other(Box<dyn Error + Send>),
}


impl BootstrapError {
pub fn fetch_failed(source_id: String, inner: Box<dyn Error + Send>) -> Self {
BootstrapError::FetchFailed { source_id, inner }
Expand All @@ -170,3 +171,11 @@ impl BootstrapError {
BootstrapError::Other(inner)
}
}

#[derive(Error, Debug)]
pub enum UnsubscriptionError {
#[error("Failed to unsubscribe: {0}")]
UnsubscribeFailed(String),
#[error("{0}")]
Other(Box<dyn Error + Send>),
}
7 changes: 7 additions & 0 deletions query-container/query-host/src/query_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,13 @@ impl QueryWorker {
_ = result_index.clear().await;
_ = archive_index.clear().await;
_ = change_stream.unsubscribe().await;
// Iterate over the subscriptions and unsubscribe from each one
for subscription in &modified_config.sources.subscriptions {
match source_client.unsubscribe(query_container_id.to_string(), query_id.to_string(), subscription.id.to_string()).await {
Ok(_) => {},
Err(err) => log::error!("Error unsubscribing from source {}: {}", subscription.id, err),
};
}
match publisher.publish(
&query_id,
ResultEvent::from_control_signal(
Expand Down
36 changes: 35 additions & 1 deletion query-container/query-host/src/source_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use reqwest_streams::JsonStreamResponse;
use serde_json::json;

use crate::models::BootstrapError;
use crate::models::{BootstrapError, UnsubscriptionError};

#[derive(Debug)]
pub struct SourceClient {
Expand Down Expand Up @@ -84,6 +84,40 @@
}
})
}

pub async fn unsubscribe(
&self,
query_container_id: String,
query_id: String,
subscription_id: String,
) -> Result<(), UnsubscriptionError> {
let app_id = format!("{}-query-api", subscription_id);

let resp = match self
.client
.delete(format!("http://{}/subscription/{}/{}", app_id, query_container_id, query_id))

Check warning

Code scanning / devskim

An HTTP-based URL without TLS was detected. Warning

Insecure URL
.send()
.await
{
Ok(resp) => resp,
Err(e) => {
return Err(UnsubscriptionError::UnsubscribeFailed(format!(
"Failed to unsubscribe from app '{}': {}",
app_id, e
)))
}
};

if !resp.status().is_success() {
return Err(UnsubscriptionError::UnsubscribeFailed(format!(
"Failed to unsubscribe from query node '{}': {}",
query_id,
resp.text().await.unwrap_or_default()
)));
}
Ok(())

}
}

mod v2 {
Expand Down
22 changes: 12 additions & 10 deletions sources/shared/change-router/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 35 additions & 5 deletions sources/shared/change-router/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,13 @@ async fn process_changes(
);
debug!("ChangeEvent: {}", change);

// Bootstrap
if change["payload"]["source"]["db"] == "Drasi" {
if change["payload"]["source"]["table"] == "SourceSubscription" {
if change["op"] == "i" {
// Subscription and unsubscription events
if change["payload"]["source"]["db"] == "Drasi"
&& change["payload"]["source"]["table"] == "SourceSubscription"
{
match change["op"].as_str() {
Some("i") => {
// Handle SourceSubscription
info!(
"Activating new SourceSubscription: id:{}",
change["payload"]["after"]["id"]
Expand Down Expand Up @@ -388,7 +391,34 @@ async fn process_changes(
return Err(e);
}
}
} else {
}
Some("d") => {
// Handle unsubscription
let state_key = format!(
"SourceSubscription-{}-{}",
match change["payload"]["before"]["queryNodeId"].as_str() {
Some(query_node_id) => query_node_id,
None =>
return Err(Box::<dyn std::error::Error>::from(
"Error loading queryNodeId from the ChangeEvent"
)),
},
match change["payload"]["before"]["queryId"].as_str() {
Some(query_id) => query_id,
None =>
return Err(Box::<dyn std::error::Error>::from(
"Error loading queryId from the ChangeEvent"
)),
}
);
match state_manager.delete_state(&state_key, None).await {
Ok(_) => info!("Deleted Subscription {} from state store", state_key),
Err(e) => {
return Err(e);
}
}
}
_ => {
// TODO - supprt other ops on SourceSubscriptions
}
}
Expand Down
24 changes: 24 additions & 0 deletions sources/shared/change-router/src/state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,28 @@
Err(e) => Err(Box::new(e)),
}
}

pub async fn delete_state(
&self,
key: &str,
metadata: Option<HashMap<String, String>>,
) -> Result<(), Box<dyn std::error::Error>> {
let addr = "https://127.0.0.1".to_string();

Check notice

Code scanning / devskim

Accessing localhost could indicate debug code, or could hinder scaling. Note

Do not leave debug code in production
let mut dapr_client = dapr::Client::<dapr::client::TonicClient>::connect(addr)
.await
.expect("Unable to connect to Dapr");

let response = match dapr_client
.delete_state(&self.store_name, &key.to_string(), metadata)
.await
{
Ok(_) => (),
Err(e) => {
log::error!("Error deleting the Dapr state store: {:?}", e);
return Err(Box::new(e));
}
};

Ok(())
}
}
6 changes: 4 additions & 2 deletions sources/shared/query-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ pub struct ControlEvent {
#[derive(Serialize)]
pub struct SubscriptionPayload {
pub source: Source,
pub before: Option<()>,
pub after: SubscriptionRequest,
#[serde(skip_serializing_if = "Option::is_none")]
pub before: Option<SubscriptionRequest>,
#[serde(skip_serializing_if = "Option::is_none")]
pub after: Option<SubscriptionRequest>,
}

#[derive(Serialize)]
Expand Down
72 changes: 69 additions & 3 deletions sources/shared/query-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
use api::{v2::AcquireRequest, ControlEvent, Source, SubscriptionPayload, SubscriptionRequest};
use async_stream::stream;
use axum::{
extract::State,
extract::{Path, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
response::Json,
routing::post,
routing::{delete, post},
Router,
};
use axum_streams::StreamBodyAs;
Expand Down Expand Up @@ -78,6 +78,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let app = Router::new()
.route("/subscription", post(handle_subscription))
.route(
"/subscription/:queryNodeId/:queryId",
delete(handle_unsubscription),
)
.layer(
CorsLayer::new()
.allow_origin(Any)
Expand Down Expand Up @@ -151,6 +155,20 @@ async fn handle_subscription(
}
}

async fn handle_unsubscription(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Path((query_node_id, query_id)): Path<(String, String)>,
) -> impl IntoResponse {
log::info!("Creating new unsubscription for query_id: {}", query_id);

if let Err(err) = dispatch_unsubscription_event(&query_node_id, &query_id, &state).await {
return err;
}

"Unsubscription event dispatched".into_response()
}

async fn dispatch_control_event(
subscription_request: &SubscriptionRequest,
state: &Arc<AppState>,
Expand All @@ -165,7 +183,7 @@ async fn dispatch_control_event(
table: "SourceSubscription".to_string(),
},
before: None,
after: subscription_request.clone(),
after: Some(subscription_request.clone()),
},
};
let publisher = &state.publisher;
Expand All @@ -186,6 +204,54 @@ async fn dispatch_control_event(
Ok(())
}

async fn dispatch_unsubscription_event(
query_node_id: &str,
query_id: &str,
state: &Arc<AppState>,
) -> Result<(), axum::http::Response<axum::body::Body>> {
let publisher = &state.publisher;
let request = SubscriptionRequest {
query_id: query_id.to_string(),
query_node_id: query_node_id.to_string(),
node_labels: vec![],
rel_labels: vec![],
};
let control_event = ControlEvent {
op: "d".to_string(),
ts_ms: Utc::now().timestamp_millis() as u64,
payload: SubscriptionPayload {
source: Source {
db: "Drasi".to_string(),
table: "SourceSubscription".to_string(),
},
before: Some(request.clone()),
after: None,
},
};
let unsubscription_event_json = json!([control_event]);
match publisher
.publish(
unsubscription_event_json,
Headers::new(std::collections::HashMap::new()),
)
.await
{
Ok(_) => {
log::info!("Published the unsubscription event");
}
Err(e) => {
log::error!("Error publishing the unsubscription event: {:?}", e);
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("Error publishing the unsubscription event: {:?}", e),
)
.into_response());
}
}

Ok(())
}

async fn acquire_v1(
state: &AppState,
subscription_request: SubscriptionRequest,
Expand Down
Loading