Reputation: 680
My goal is to reduce the following read_stream_***() functions into a generic fuction that can be passed different streams.
use async_std::net::TcpStream;
use async_std::{ task };
use async_std::io::{ stdin, BufReader, Stdin };
use async_std:: { prelude::* };
use futures::{select, FutureExt, AsyncRead };
pub async fn read_stream_stdin(streem:Stdin) -> Result<(), std::io::Error>
{
let mut lines_from_stream = BufReader::new(streem).lines().fuse();
loop {
select! {
line = lines_from_stream.next().fuse() => match line {
Some(line) => {
println!("{:?}",line?);
}
None => break,
}
}
}
Ok(())
}
pub async fn read_stream_tcp(streem:TcpStream) -> Result<(), std::io::Error>
{
let mut lines_from_stream = BufReader::new(streem).lines().fuse();
loop {
select! {
line = lines_from_stream.next().fuse() => match line {
Some(line) => {
println!("{:?}",line?);
}
None => break,
}
}
}
Ok(())
}
pub async fn connect_tcp_server(host_port:&str) -> Result<(), std::io::Error>
{
let streem = TcpStream::connect(host_port).await;
let _result = task::block_on(read_stream_tcp(streem?));
Ok(())
}
fn main() -> Result<(), std::io::Error> {
task::spawn( connect_tcp_server("127.0.0.1:8081") );
task::block_on(read_stream_stdin(stdin()))
}
The Generic Attempt:
pub async fn read_stream<T>(streem:T) -> Result<(), std::io::Error>
{
let mut lines_from_stream = BufReader::new(streem).lines().fuse();
loop {
select! {
line = lines_from_stream.next().fuse() => match line {
Some(line) => {
println!("{:?}",line?);
}
None => break,
}
}
}
Ok(())
}
The Cargo.toml
[package]
name = "gen_func"
version = "0.1.0"
edition = "2021"
[dependencies]
async-std = "1.9.0"
futures = "0.3.21"
I attempted <T: async_std::io::Read> but fuse() and lines() are not implemented. and AsyncRead is not found in async_std::io . I found AsyncRead in futures crate but again fuse() and lines() were not implemented. I am not set on the read pattern. I am new to Rust and trying to build my source library to solve future programming tasks.
Upvotes: 1
Views: 134
Reputation: 2401
First, as pointed out by kmdreko, the logic of your function(s) can be greatly simplified (at least based on the information given):
pub async fn read_stream_tcp(stream: TcpStream) -> Result<(), std::io::Error> {
let mut lines = BufReader::new(stream).lines();
while let Some(line) = lines.next().await {
println!("{line:?}");
}
}
Ok(())
Then, to figure out how to make this generic, you can just let the compiler tell you what it needs:
pub async fn read_stream<T>(stream: T) -> Result<(), std::io::Error>
{
let mut lines = BufReader::new(stream).lines();
while let Some(line) = lines.next().await {
println!("{line:?}");
}
Ok(())
}
Notice the lack of where
clauses or other constraints on T
. The compiler will now complain:
error[E0277]: the trait bound `T: async_std::io::Read` is not satisfied --> src/main.rs:15:36 |
15 | let mut lines = BufReader::new(stream).lines();
| -------------- ^^^^^^ the trait `async_std::io::Read` is not implemented for `T`
| |
| required by a bound introduced by this call
|
note: required by a bound in `async_std::io::BufReader::<R>::new`
--> /home/lucas/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.12.0/src/io/buf_reader.rs:55:9
|
55 | impl<R: io::Read> BufReader<R> {
| ^^^^^^^^ required by this bound in `async_std::io::BufReader::<R>::new`
help: consider restricting type parameter `T`
|
13 | pub async fn read_stream<T: async_std::io::Read>(stream: T) -> Result<(), std::io::Error>
| +++++++++++++++++++++
Applying the compiler's suggestions (the above will result in a follow-up error) yields a full where
clause of T: async_std::io::Read + std::marker::Unpin
:
pub async fn read_stream<T>(stream: T) -> Result<(), std::io::Error>
where
T: Read + std::marker::Unpin,
{
let mut lines = BufReader::new(stream).lines();
while let Some(line) = lines.next().await {
println!("{line:?}");
}
Ok(())
}
async fn try_it() {
// These will now compile just fine
read_stream(async_std::io::stdin()).await.unwrap();
read_stream(TcpStream::connect("127.0.0.1:8080").await.unwrap()).await.unwrap();
}
I attempted <T: async_std::io::Read> but fuse() and lines() are not implemented
This suggests that you tried replacing BufReader::new(stream)
at the same time. You can do that, but you need to tell the compiler that you need something that implements the lines()
method. Either make the parameter a fixed type BufReader<T>
or make the where
clause T: async_std::io::BufRead + std::marker::Unpin
for a generic type.
Upvotes: 1