-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Add back the async bus #15
base: main
Are you sure you want to change the base?
Conversation
They're not very good tests, but they do illustrate the problems with the current implementation of the futures Sink/Stream.
The implementation is pretty crappy! It's full of TODO comments. I didn't reorganize the code in a way that preserves much of the original semantic clarity, which was definitely the first mistake. It works, and does demonstrate one way of factoring around sometimes blocking and sometimes not. The tests might not be comprehensive enough either. I'm not sure.
@@ -318,7 +451,7 @@ impl<T> Bus<T> { | |||
// we run a separate thread responsible for unparking | |||
// so we don't have to wait for unpark() to return in broadcast_inner | |||
// sending on a channel without contention is cheap, unparking is not | |||
let (unpark_tx, unpark_rx) = mpsc::channel::<thread::Thread>(); | |||
let (unpark_tx, unpark_rx) = mpsc::channel::<P>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might find it easier to get rid of the separate wakeup thread while you're at it. It was probably a poor choice in the first place anyway!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a comment about it.. is the comment untrue? could bench it either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was some discussion on IRC (I think it was talchas) who pointed out that the channel send/receive ends up being about similarly expensive. And I think in the futures
case wakeup is even cheaper.
// no, so block by parking and telling readers to notify on last read | ||
self.state.ring[fence] | ||
.waiting | ||
.replace(Some(Box::new(thread::current())), atomic::Ordering::Relaxed); | ||
.replace(Some(Box::new(current)), atomic::Ordering::Relaxed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm really sad about this Box
, but unfortunately I think it's necessary. We could perhaps at least hoist it outside the loop
..?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AtomicOption
requires the Box
though.
well, i'm partway through this and finding some deadlocks in the async code. would hopping back on irc be an option for debugging this? if not, i can post what i have and explain what i'm seeing. |
I'm unfortunately busy today (about to do a Rust live-coding session :D), but could try to get on IRC some time tomorrow. What's your timezone? |
UTC-7. i'm around a lot though! |
Is there still interest in merging this or something like it? It would be useful for a project I'm working on. If the original author doesn't want to work on it, I'd be willing to take a stab at finishing this up and later porting it to std::futures. |
#[cfg(feature = "async")] | ||
impl<T> Bus<T, futures::task::Task> { | ||
/// TODO forwards to with_parkable | ||
pub fn new_async(len: usize) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There really isn't a reason for this to have a different name than Bus<T, Thread>::new
. In most cases, type inference can determine which one should be used (for example, populating a struct field), and other examples can be resolved by specifying the type parameter at construction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given also that this is equivalent to new
, it may even make sense to get rid of these, rename with_parkable
to new
and make it public (it seems to be already, though it was intended to be some kind of implementation detail).
btw, I did make a completely separate reimplementation of this wherein I solved the issues I was running into at the expense of making it not generic to sync/async anymore: https://github.com/habnabit/bus/tree/async_bus there was another comment I made about the overall structure of |
ah, found it: crossbeam-rs/crossbeam-channel#38 (comment) |
I'm taking a stab at writing a Futures 0.3+ version of this. It seems to me that this would alter the architecture even more than @habnabit's async_bus rewrite. The biggest change would be to leverage AtomicWaker, remove the background waker thread/channels, and eliminate (or move, really) thread parking altogether. This would make the API async-first, then the non-blocking calls would be implemented in terms of poll*, and blocking calls would be implemented in futures::block_on or similar. @jonhoo - Is this a direction you'd want the implementation to go? If not, I guess it'd have to be a forever-fork, which doesn't sound pleasant. |
I'm very wary of implementing the sync version of this using something like |
Hey, any news regarding this? |
I ended up implementing the async rewrite, but it did end up basically as a rewrite. After implementing it, I think I agree with @jonhoo. It'd be hard to reconcile these as different flavors of the same implementation. It should just be a separate impl. Some MPMC schemes like After I implemented it, we shifted the architecture of our system and my async version is no longer in use. |
Wow this is incomplete! The tests don't pass per se, but that's on purpose. It does demonstrate the concept and that it can be implemented easily enough. There's even a little code deduplication.