Reputation: 37
I need to handle async data streams from several different sources which should all be parsed into some normalized format.
Ideally I would like to:
Write a handler for each source that implements some Handler
trait so downstream callers can interact with the data streams and be agnostic with respect to the underlying implementation.
Have some function which can return a Box<dyn Handler>
given a key since there are a limited/predictable number of sources for which Handler
is implemented.
I've tried something like the following using #[async_trait]
but I can't get it to compile, mostly with error "the trait Handler
cannot be made into an object".
Is there a better/more idiomatic way to approach this problem?
use async_trait; // 0.1.52
use tokio; // 1.15.0
#[async_trait::async_trait]
trait Handler{
type Output;
async fn connect()->Self::Output;
async fn read_parse(&self)->Vec<i32>;
async fn run(&self) {
for _ in 0..5 {
self.read_parse().await;
}
}
}
struct FooHandler;
#[async_trait::async_trait]
impl Handler for FooHandler {
type Output = FooHandler;
async fn connect() -> Self::Output {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
FooHandler{}
}
async fn read_parse(&self)->Vec<i32> {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
vec![1, 2, 3]
}
}
struct BarHandler;
#[async_trait::async_trait]
impl Handler for BarHandler {
type Output = BarHandler;
async fn connect() -> Self::Output {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
BarHandler{}
}
async fn read_parse(&self)->Vec<i32> {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
vec![1, 2, 3]
}
}
async fn get_handler(name:&str)->Option<Box<dyn Handler>> {
match name {
"FOO"=>Some(Box::new(FooHandler::connect().await)),
"BAR"=>Some(Box::new(BarHandler::connect().await)),
_=>None
}
}
#[tokio::main]
async fn main() {
let handler = get_handler("FOO").await.unwrap();
handler.run().await;
}
EDIT:
Per the answer from cameron1024 I was able to get the following to work:
use async_trait; // 0.1.52
use tokio; // 1.15.0
#[async_trait::async_trait]
trait Handler: Sync {
async fn connect()->Self
where
Self: Sized;
async fn read_parse(&self)->Vec<i32>;
async fn run(&self) {
for _ in 0..5 {
let res = self.read_parse().await;
println!("{:?}", res);
}
}
}
struct FooHandler;
#[async_trait::async_trait]
impl Handler for FooHandler {
async fn connect() -> Self {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
println!("connected");
FooHandler{}
}
async fn read_parse(&self)->Vec<i32> {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
vec![1, 2, 3]
}
}
async fn get_handler(name: &str)->Option<Box<dyn Handler>> {
match name {
"FOO"=>Some(Box::new(FooHandler::connect().await)),
_=>None
}
}
#[tokio::main]
async fn main() {
// let handler = FooHandler::connect().await;
let handler = get_handler("FOO").await.unwrap();
handler.run().await;
}
Upvotes: 0
Views: 974
Reputation: 10136
There are 2 issues here.
First, for a async trait to have default implementations, it requires that the trait itself have Send
or Sync
(depending on the receiver type) as a supertrait (there is a brief explanation in the "dyn traits" section of the readme for the crate: https://docs.rs/async-trait/latest/async_trait/)
Second the connect
function is not object safe, because its return type is an associated function. This is nothing to do with async-ness, it's an object safety concern. This simplified example has the same issue:
fn main() {
let x: Box<dyn Foo<Bar = ()>> = Box::new(());
}
trait Foo {
type Bar;
fn connect() -> Self::Bar;
}
impl Foo for () {
type Bar = ();
fn new() -> Self::Bar {
todo!()
}
}
However, you're unlikely to want to invoke connect
on a trait object, since it has no self
parameter. Instead, you can opt that specific function out of the trait object by adding the Self: Sized
constraint to connect
.
You can then create a trait object, but connect
won't be available.
Upvotes: 2