Skip to content

Commit

Permalink
fix(daemon): limit number of git children (#9572)
Browse files Browse the repository at this point in the history
### Description

This PR limits the amount of `git` child sub processes that the daemon
can start at one time so we don't flood the system with requests when
many things change e.g. a during a rebase.

### Testing Instructions

Added quick unit test to make sure we're using semaphore correctly. Did
a manual test on a big repo and confirmed that on big changes and made
sure that the number of `git` processes remain in the single digits now.
  • Loading branch information
chris-olszewski authored Dec 5, 2024
1 parent 4d02c7f commit e7151e0
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ lazy_static = "1.4.0"
merge = "0.1.0"
mime = "0.3.16"
notify = "6.1.1"
num_cpus = "1.15.0"
once_cell = "1.17.1"
owo-colors = "3.5.0"
parking_lot = "0.12.1"
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-filewatch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ futures = { version = "0.3.26" }
itertools = { workspace = true }
nibble_vec = "0.1.0"
notify = { workspace = true }
num_cpus = { workspace = true }
radix_trie = { workspace = true }
thiserror = "1.0.38"
tokio = { workspace = true, features = ["full", "time"] }
Expand Down
23 changes: 18 additions & 5 deletions crates/turborepo-filewatch/src/hash_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{
debouncer::Debouncer,
globwatcher::{GlobError, GlobSet},
package_watcher::DiscoveryData,
scm_resource::SCMResource,
NotifyError, OptionalWatch,
};

Expand Down Expand Up @@ -155,7 +156,7 @@ struct Subscriber {
repo_root: AbsoluteSystemPathBuf,
package_discovery: watch::Receiver<Option<DiscoveryData>>,
query_rx: mpsc::Receiver<Query>,
scm: SCM,
scm: SCMResource,
next_version: AtomicUsize,
}

Expand Down Expand Up @@ -327,7 +328,7 @@ impl Subscriber {
Self {
repo_root,
package_discovery,
scm,
scm: SCMResource::new(scm),
query_rx,
next_version: AtomicUsize::new(0),
}
Expand Down Expand Up @@ -532,19 +533,31 @@ impl Subscriber {
let debouncer_copy = debouncer.clone();
tokio::task::spawn(async move {
debouncer_copy.debounce().await;
let scm_permit = scm.acquire_scm().await;
// We awkwardly copy the actual SCM instance since we're sending it to a
// different thread which requires it be 'static.
let scm_instance = scm_permit.clone();
// Package hashing involves blocking IO calls, so run on a blocking thread.
tokio::task::spawn_blocking(move || {
let blocking_handle = tokio::task::spawn_blocking(move || {
let telemetry = None;
let inputs = spec.inputs.as_inputs();
let result =
scm.get_package_file_hashes(&repo_root, &spec.package_path, &inputs, telemetry);
let result = scm_instance.get_package_file_hashes(
&repo_root,
&spec.package_path,
&inputs,
telemetry,
);
trace!("hashing complete for {:?}", spec);
let _ = tx.blocking_send(HashUpdate {
spec,
version,
result,
});
});
// We wait for the git task to finish so `scm_permit` only gets dropped once the
// resource is no longer being used.
// We should not shut down if a SCM task panics
blocking_handle.await.ok();
});
(version, debouncer)
}
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-filewatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub mod globwatcher;
pub mod hash_watcher;
mod optional_watch;
pub mod package_watcher;
mod scm_resource;

pub use optional_watch::OptionalWatch;

Expand Down
78 changes: 78 additions & 0 deletions crates/turborepo-filewatch/src/scm_resource.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::{ops::Deref, sync::Arc};

use tokio::sync::{Semaphore, SemaphorePermit};
use turborepo_scm::SCM;

#[derive(Debug, Clone)]
pub struct SCMResource {
scm: SCM,
semaphore: Arc<Semaphore>,
}

pub struct SCMPermit<'a> {
scm: &'a SCM,
_permit: SemaphorePermit<'a>,
}

impl SCMResource {
pub fn new(scm: SCM) -> Self {
// We want to only take at most NUM_CPUS - 3 for git processes.
// Accounting for the `turbo` process itself and the daemon this leaves one core
// available for the rest of the system.
let num_permits = num_cpus::get().saturating_sub(3).max(1);
Self::new_with_permits(scm, num_permits)
}

fn new_with_permits(scm: SCM, num_permits: usize) -> Self {
let semaphore = Arc::new(Semaphore::new(num_permits));
Self { scm, semaphore }
}

pub async fn acquire_scm(&self) -> SCMPermit {
let _permit = self
.semaphore
.acquire()
.await
.expect("semaphore should not be closed");
SCMPermit {
scm: &self.scm,
_permit,
}
}
}

impl<'a> Deref for SCMPermit<'a> {
type Target = SCM;

fn deref(&self) -> &Self::Target {
self.scm
}
}

#[cfg(test)]
mod test {
use tokio::sync::oneshot;

use super::*;

#[tokio::test]
async fn test_limits_access() {
let scm = SCMResource::new_with_permits(SCM::Manual, 1);
let scm_copy = scm.clone();
let permit_1 = scm.acquire_scm().await;
let (other_tx, mut other_rx) = oneshot::channel();
tokio::task::spawn(async move {
let _permit_2 = scm_copy.acquire_scm().await;
other_tx.send(()).ok();
});
assert!(
other_rx.try_recv().is_err(),
"other should not have gotten a scm permit"
);
drop(permit_1);
assert!(
other_rx.await.is_ok(),
"other should have gotten permit and exited"
);
}
}
2 changes: 1 addition & 1 deletion crates/turborepo-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ merge = { workspace = true }
miette = { workspace = true, features = ["fancy"] }
nix = "0.26.2"
notify = { workspace = true }
num_cpus = "1.15.0"
num_cpus = { workspace = true }
owo-colors = { workspace = true }
path-clean = "1.0.1"
petgraph = { workspace = true }
Expand Down

0 comments on commit e7151e0

Please sign in to comment.