Reputation: 1082
I would like to have a struct for handling some streams. All streams share the same Item. The logic is the following: I need to create a unique stream that contains all items that came from all other streams. I need also to add "new" stream to the "main" stream. I don't care from which stream the next item comes from.
For doing that, I see there's the select_all
function that should do the logic described above.
pub struct WsPool {
merge: Arc<Mutex<SelectAll<Box<dyn Stream<Item=MyItem> + Send + 'static>>>>,
}
impl WsPool {
pub fn new() -> Self {
Self {
merge: Arc::new(Mutex::new(SelectAll::new())),
}
}
pub fn add(&self, s: Box<dyn Stream<Item = MyItem> + Send + 'static>) {
let mut merge = self.merge.lock().unwrap();
merge.push(s);
}
pub async fn process(&self) {
loop {
let mut merge = self.merge.lock().unwrap();
let item = merge.await.next();
}
}
}
But I receive these errors:
error[E0277]: `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>` is not a future
--> src/ws_pool.rs:30:24
|
30 | let item = merge.await.next();
| ^^^^^^^^^^^ `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>` is not a future
|
= help: the trait `futures::Future` is not implemented for `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>`
note: required by `futures::Future::poll`
--> /home/allevo/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/future.rs:99:5
|
99 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
error[E0277]: `(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)` cannot be unpinned
--> src/ws_pool.rs:17:40
|
17 | merge: Arc::new(Mutex::new(SelectAll::new())),
| ^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)`
|
= note: consider using `Box::pin`
= note: required because of the requirements on the impl of `futures::Stream` for `Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>`
note: required by `futures::stream::SelectAll::<St>::new`
--> /home/allevo/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.17/src/stream/select_all.rs:47:5
|
47 | pub fn new() -> Self {
| ^^^^^^^^^^^^^^^^^^^^
error[E0599]: the method `push` exists for struct `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>`, but its trait bounds were not satisfied
--> src/ws_pool.rs:24:15
|
24 | merge.push(s);
| ^^^^ method cannot be called on `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>` due to unsatisfied trait bounds
|
::: /home/allevo/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/boxed.rs:172:1
|
172 | / pub struct Box<
173 | | T: ?Sized,
174 | | #[unstable(feature = "allocator_api", issue = "32838")] A: Allocator = Global,
175 | | >(Unique<T>, A);
| |________________- doesn't satisfy `_: futures::Stream`
|
= note: the following trait bounds were not satisfied:
`Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>: futures::Stream`
What am I doing wrong? Otherwise how can I store multiple streams and iterate over them?
Upvotes: 2
Views: 596
Reputation: 60493
Your first problem is a simple mixup. You want to await
the next value, not the stream itself:
let item = merge.next().await;
The resulting errors are all because SelectAll<Box<dyn Stream + Send + 'static>>
does not implement Stream
. If you look at the impl Stream for SelectAll
, it is constrained that the inner stream type implements Unpin
.
You can fix this by adding it to the bounds:
use std::marker::Unpin;
// vvvvv
Arc<Mutex<SelectAll<Box<dyn Stream<Item=MyItem> + Unpin + Send + 'static>>>>
Or a better solution is to just pin the streams:
use std::pin::Pin;
// vvv
Arc<Mutex<SelectAll<Pin<Box<dyn Stream<Item=MyItem> + Send + 'static>>>>>
The difference is the latter can accept more Stream
types. You just have to use Box::pin
when adding them.
See it working on the Playground.
Upvotes: 3