Clone any Stream S
where <S as Stream>::Item: Clone
use fork_stream::StreamExt as _;
async fn example() {
let source = futures::stream::iter(0..3);
let fork1 = source.fork();
let fork2 = fork1.clone();
assert_eq!(fork1.collect(), vec![0, 1, 2]);
assert_eq!(fork2.collect(), vec![0, 1, 2]);
}
- Polled items from the source stream are stored in a buffer;
- When forks of the source stream are polled, they either yield clones of the items from the buffer, or poll the source stream for new items;
- If there are no longer any forks that may read an item (either because the fork is dropped, or because all of the forks have already yielded that item) the item is dropped;
- Whenever possible, items from the buffer are moved out instead of clonned.
- When all of the forks are dropped, source stream is dropped.
Any fork can be downgraded to a [Weak
], which can later be upgraded back, similar to [std::rc::Rc
] or [std::sync::Arc
] APIs.
This behaves as follows:
- [
Weak
] does not implement [Stream
] and cannot be polled without being upgraded first; - When a [
Weak
] is upgraded into a [Forked
], the resulting [Forked
] is as advanced as the source stream; i.e. it will not yield any items that had been yielded by any other forks prior to the upgrade. - If all of the forks had been dropped prior to the upgrade,
Weak::upgrade
returnsNone
.
Weak
API is useful when you want to reuse streams that are expensive to intialize,
but also want to drop them when they are not needed.
This library implements an API similar to that of shared_stream
, with a few notable differences:
- Streams produced by this library are [
Send
] and [Sync
]. For this reason we have to use synchronisation primitives that support it, which may be less performant, but makes it a more suitable option for async environments. shared_stream
buffers the items for as long as at least one clone of the source stream exists. This library "garbage collects" the items as soon as possible. This comes at a cost of some extra business logic, which may be less performant, but makes it a more suitable option for situations where streams are supposed to be long-lived, such as servers.- This library provides a [
Weak
] API, see above.