Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Implementation of atomic_wait / atomic_notify #589

Open
wants to merge 1 commit into
base: dev/wasm-threads
Choose a base branch
from

Conversation

resulknad
Copy link
Collaborator

We discussed a solution to have multiple threads with a single store while we wait for the shared-everything-threads to land. This is opposed to the instance-per-thread solution as implemented in wasi-threads.

This commit is still WIP, but maybe a good point to pause and have a discussion of where we will take this.

Right now it contains:

  • Adding support for multi-threaded tests in spec.rs
  • Thread-safe module instantiation
  • Very basic wait / notify implementation

This is somewhat working, it passes the wait notify test cases. Right now we have only ever one thread per module instantiation, which makes things easier to argue about.

I wonder though, in the desired use case, does that assumption still hold?

@resulknad resulknad requested a review from ia0 as a code owner August 23, 2024 16:37
Copy link
Member

@ia0 ia0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! It's much easier to discuss with code.

Right now we have only ever one thread per module instantiation, which makes things easier to argue about.

Interesting, I was imagining it with a single module instantiation because of the use-case I have in mind (see below). But I understand that to be able to use the existing test suite, we kind of have to stay similar to the instance-per-thread design, since it's written using different modules for different threads.

I wonder though, in the desired use case, does that assumption still hold?

The use-case I have in mind is something like:

  • Each applet has its own store (right now there's only one applet, so one store).
  • The store may run multiple threads (in practice, it will probably be at most 2: "main" and "interrupt").
  • Threads don't run in parallel, only concurrently (we only target single-core CPUs for now). But if we can avoid using this assumption, we should. In particular, we can assume the interpreter will only switch threads at very specific points (like calling a host function).
  • Threads have a priority given by their creation time (most recent has higher priority). The highest priority non-waiting thread runs (if running in parallel, then the top N highest priority non-waiting threads).

We probably would need to write our own tests if we go that way. I don't think we need more than a few of them. This feature is going to be experimental until the shared-everything-threads proposal becomes concrete.

Actually, thinking about it, we might be able to run the test suite with the multiple threads per store. It's just that we won't use it like that. But it should support instantiating different modules and invoking them concurrently.

We can discuss the design in more details in another channel if you want.

@@ -500,8 +507,8 @@ struct Continuation<'m> {
}

impl<'m> Store<'m> {
fn last_inst(&mut self) -> &mut Instance<'m> {
self.insts.last_mut().unwrap()
fn last_inst(&mut self, inst_id: usize) -> &mut Instance<'m> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An inst() method should be added instead.

self.last_inst().module = module;
for import in self.last_inst().module.imports() {
let type_ = import.type_(&self.last_inst().module);
drop(lock);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason not to take the lock for the whole instantiation? It seems it would simplify things and we don't need concurrent instantiation.

@@ -61,6 +61,7 @@ pub struct Store<'m> {
// functions in `funcs` is stored to limit normal linking to that part.
func_default: Option<(&'m str, usize)>,
threads: Vec<Continuation<'m>>,
lock: spin::Mutex<()>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason not to split the store into mutable and immutable part and put the mutable part in the mutex?

Maybe the threads field could become Mutex<Vec<Mutex<Vec<Continuation>>>> where the outer Mutex<Vec is the different concurrent threads and the inner Mutex<Vec is the "call-stack" (with respect to host functions) of the given thread.

None => return Err(trap()),
Some(x) => x,
};
self.push_value(count);

let notified = mem.share_data.notify(_mem, count.unwrap_i32());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: _mem is for unused variables, the underscore can be removed

#[derive(Debug, Default)]
struct ShareData {
queue: Vec<WaitingThread>,
lock: spin::Mutex<()>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, in Rust the data protected by the mutex is usually in the mutex.

Comment on lines 1757 to +1760
#[cfg(feature = "threads")]
share: bool,
#[cfg(feature = "threads")]
share_data: ShareData,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if it doesn't make sense to have ShareData for non-shared memory, then we should use Option.

Suggested change
#[cfg(feature = "threads")]
share: bool,
#[cfg(feature = "threads")]
share_data: ShareData,
#[cfg(feature = "threads")]
share: Option<ShareData>,

assert!(env.inst.is_ok());
for directive in wast.directives {
let pool =
unsafe { std::slice::from_raw_parts_mut(std::alloc::alloc_zeroed(layout), layout.size()) };
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to zero here? We should already do it in exec.rs.

let pool =
unsafe { std::slice::from_raw_parts_mut(std::alloc::alloc_zeroed(layout), layout.size()) };
let mut env_arc = Arc::new(Env::new(pool));
let first_call = root_env.is_some();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks weird. I would expect is_none given the variable name first_call.

println!("DBK wast-thread {:?}", thread.name);
threads.push(thread);
}
WastDirective::Wait { span, thread } => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we wait only on the thread id provided? Also it looks like the threads are spawned here and not when declared. I tried to check the reference interpreter and my understanding is that threads may execute as soon as declared. The interpreter randomly choose which task to execute (it doesn't seem multi-threaded).

env_arc = root_env.unwrap();
}
let env_arc_cpy = env_arc.clone();
let env = unsafe { Arc::<Env<'_>>::get_mut_unchecked(env_arc.borrow_mut()) };
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually unsound. When we spawn multiple threads, they will each get a mutable reference to the environment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants