allevo
allevo

Reputation: 1082

push on SelectAll Stream

I would like to have a struct for handling some streams. All streams share the same Item. The logic is the following: I need to create a unique stream that contains all items that came from all other streams. I need also to add "new" stream to the "main" stream. I don't care from which stream the next item comes from.

For doing that, I see there's the select_all function that should do the logic described above.

pub struct WsPool {
    merge: Arc<Mutex<SelectAll<Box<dyn Stream<Item=MyItem> + Send + 'static>>>>,
}

impl WsPool {
    pub fn new() -> Self {
        Self {
            merge: Arc::new(Mutex::new(SelectAll::new())),
        }
    }

    pub fn add(&self, s: Box<dyn Stream<Item = MyItem> + Send + 'static>) {
        let mut merge  = self.merge.lock().unwrap();

        merge.push(s);
    }

    pub async fn process(&self) {
        loop {
            let mut merge = self.merge.lock().unwrap();
            let item = merge.await.next();
        }
    }
}

But I receive these errors:

error[E0277]: `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>` is not a future
  --> src/ws_pool.rs:30:24
   |
30 |             let item = merge.await.next();
   |                        ^^^^^^^^^^^ `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>` is not a future
   |
   = help: the trait `futures::Future` is not implemented for `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>`
note: required by `futures::Future::poll`
  --> /home/allevo/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/future.rs:99:5
   |
99 |     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

error[E0277]: `(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)` cannot be unpinned
  --> src/ws_pool.rs:17:40
   |
17 |             merge: Arc::new(Mutex::new(SelectAll::new())),
   |                                        ^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)`
   |
   = note: consider using `Box::pin`
   = note: required because of the requirements on the impl of `futures::Stream` for `Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>`
note: required by `futures::stream::SelectAll::<St>::new`
  --> /home/allevo/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.17/src/stream/select_all.rs:47:5
   |
47 |     pub fn new() -> Self {
   |     ^^^^^^^^^^^^^^^^^^^^

error[E0599]: the method `push` exists for struct `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>`, but its trait bounds were not satisfied
   --> src/ws_pool.rs:24:15
    |
24  |           merge.push(s);
    |                 ^^^^ method cannot be called on `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>` due to unsatisfied trait bounds
    |
   ::: /home/allevo/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/boxed.rs:172:1
    |
172 | / pub struct Box<
173 | |     T: ?Sized,
174 | |     #[unstable(feature = "allocator_api", issue = "32838")] A: Allocator = Global,
175 | | >(Unique<T>, A);
    | |________________- doesn't satisfy `_: futures::Stream`
    |
    = note: the following trait bounds were not satisfied:
            `Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>: futures::Stream`

What am I doing wrong? Otherwise how can I store multiple streams and iterate over them?

Upvotes: 2

Views: 596

Answers (1)

kmdreko
kmdreko

Reputation: 60493

Your first problem is a simple mixup. You want to await the next value, not the stream itself:

let item = merge.next().await;

The resulting errors are all because SelectAll<Box<dyn Stream + Send + 'static>> does not implement Stream. If you look at the impl Stream for SelectAll, it is constrained that the inner stream type implements Unpin.

You can fix this by adding it to the bounds:

use std::marker::Unpin;
                                               // vvvvv
Arc<Mutex<SelectAll<Box<dyn Stream<Item=MyItem> + Unpin + Send + 'static>>>>

Or a better solution is to just pin the streams:

use std::pin::Pin;
                 // vvv
Arc<Mutex<SelectAll<Pin<Box<dyn Stream<Item=MyItem> + Send + 'static>>>>>

The difference is the latter can accept more Stream types. You just have to use Box::pin when adding them.

See it working on the Playground.

Upvotes: 3

Related Questions