Reputation: 123
I'd like a function which asynchronously processes a variable amount of (Sink
, Stream
) tuples.
use futures::channel::mpsc;
use futures::{Sink, Stream, SinkExt, StreamExt};
async fn foo(v: Vec<(Box<dyn Sink<Error = std::io::Error>>, Box<dyn Stream<Item = u8>>)>) {
for (mut tx, mut rx) in v {
let _ = tx.send(0);
let _ = rx.next().await;
}
}
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, mut rx) = mpsc::channel(32);
foo(vec![(Box::new(tx), Box::new(rx))]).await;
Ok(())
}
But I get this compilation error:
error[E0107]: wrong number of type arguments: expected 1, found 0 --> src/main.rs:4:30 | 4 | async fn foo(v: Vec<(Box<dyn Sink<Error = std::io::Error>>, Box<dyn Stream<Item = u8>>)>) { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected 1 type argument
I was prompted to declare the associated type for the trait object that way by the compiler itself. I'm unsure why it does not accept it.
Upvotes: 0
Views: 470
Reputation: 155465
The compiler wants you to specify the "type argument" of the Sink
. This is not the error type, but the type of the item being sent down the sink, as in Sink<Foo>
. You specify u8
as the type of the stream, and are sending the value unchanged between one and the other, so you probably want a Sink<u8>
.
Once you do that, the compiler will next complain that you need to specify the Error
associated type (this time for real). However if you specify std::io::Error
, the call to foo()
from main()
won't compile because the implementation of Sink
for mpsc::Sender
specifies its own mpsc::SendError
as the error type.
Finally, both the sink and the stream need to be pinned so they can live across await points. This is done by using Pin<Box<...>>
instead of Box<...>
and Box::pin(...)
instead of Box::new(...)
.
With the above changes, a version that compiles looks like this:
use futures::channel::mpsc;
use futures::{Sink, SinkExt, Stream, StreamExt};
use std::pin::Pin;
async fn foo(
v: Vec<(
Pin<Box<dyn Sink<u8, Error = mpsc::SendError>>>,
Pin<Box<dyn Stream<Item = u8>>>,
)>,
) {
for (mut tx, mut rx) in v {
let _ = tx.send(0);
let _ = rx.next().await;
}
}
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, rx) = mpsc::channel(32);
foo(vec![(Box::pin(tx), Box::pin(rx))]).await;
Ok(())
}
Upvotes: 2