Reputation: 73
I am currently sending closures/functions across threads.
This works perfectly fine for sync functions.
I am specifically passing
pub type WSMethod<T> = Box<dyn Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Result<String, BoxError> + Send + Sync>;
Example function being sent
pub fn update_league(req: WSReq, conn: PgConn, _: &mut WSConnections_, _: Uuid) -> Result<String, BoxError>{
let deserialized = serde_json::from_value(req.data)?;
let league = db::update_league(&conn, deserialized)?;
let resp_msg = WSMsgOut::resp(req.message_id, req.method, league);
serde_json::to_string(&resp_msg).map_err(|e| e.into())
}
However Now I would like to switch to sending async functions,
i.e.
pub async fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Result<String, BoxError>{
let deserialized: Vec<NewCompetition> = serde_json::from_value(req.data)?;
let competitions_out= db::upsert_competitions(&conn, deserialized.into_iter().map(transform_from).collect_vec())?;
if let Some(ws_user) = ws_conns.lock().await.get_mut(&user_ws_id){
sub_to_competitions(ws_user, competitions_out.iter().map(|c| &c.competition_id)).await;
}
publish_competitions(ws_conns, &competitions_out).await;
let resp_msg = WSMsgOut::resp(req.message_id, req.method, competitions_out);
serde_json::to_string(&resp_msg).map_err(|e| e.into())
}
It's the exact same function signature, it's just async.
Where I box the functions so they can be sent around, I get this error
Box::new(upsert_competitions))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected enum `std::result::Result`, found opaque type
full:
288 | pub async fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Result<String, BoxError>{
| ------------------------ the `Output` of this `async fn`'s found opaque type
|
= note: expected enum `std::result::Result<std::string::String, std::boxed::Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>>`
found opaque type `impl core::future::future::Future`
= note: required for the cast to the object type `dyn for<'r> std::ops::Fn(warp_ws_server::WSReq, diesel::r2d2::PooledConnection<diesel::r2d2::ConnectionManager<diesel::PgConnection>>, &'r mut std::sync::Arc<tokio::sync::mutex::Mutex<std::collections::HashMap<uuid::Uuid, warp_ws_server::WSConnection<subscriptions::Subscriptions>>>>, uuid::Uuid) -> std::result::Result<std::string::String, std::boxed::Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>> + std::marker::Send + std::marker::Sync`
I have tried attaching .await
to method(req, conn, ws_conns, user_ws_id).await
, the call-site of the passed method.
This causes compiler errors here due to Future
not being implemented for Result
.
Therefore
I change type from: Box<dyn Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Result<String, BoxError> + Send + Sync>
-> Box<dyn (Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Future<Output=Result<String, BoxError>>) + Send + Sync>
it complains about sizing of futures, so I box the Future, then another error (see unpin), so I pin the error.
Eventually leading to
Box<dyn (Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Pin<Box<dyn Future<Output=Result<String, BoxError>> + Send + Sync >>) + Send + Sync>
Error now is
Box::new(upsert_competitions)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected struct `std::pin::Pin`, found opaque type
expected struct `std::pin::Pin<std::boxed::Box<dyn core::future::future::Future<Output = std::result::Result<std::string::String, std::boxed::Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>>> + std::marker::Send + std::marker::Sync>>`
found opaque type `impl core::future::future::Future
I dont understand how to go from here. I dont think I should be pinning/boxing the functions results, I want to be pin/boxing the future returned when the function is called, but I dont think I can do this,
as surely i want to be boxing/pinning the future after its created when I call the func, not earlier.
I also tried stuff like
Box::new(Pin::new(Box::new(upsert_competitions))))
based on above error,
and it gives me expected an Fn<blah>
....rather than a Pin<Box<....
Source of full up-to-date code:
closure being successfully passed as a regular function
closure being unsuccessfully passed as an async func
Edit:
Latest updates (progressed the error)
pub fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Pin<Box<dyn Future<Output=Result<String, BoxError>> + Send + Sync>>{
async fn hmmm(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Result<String, BoxError>{
let deserialized: Vec<NewCompetition> = serde_json::from_value(req.data).expect("fuck");
println!("{:?}", &deserialized);
let competitions_out= db::upsert_competitions(&conn, deserialized.into_iter().map(transform_from).collect_vec()).expect("fuck");
// assume anything upserted the user wants to subscribe to
if let Some(ws_user) = ws_conns.lock().await.get_mut(&user_ws_id){
sub_to_competitions(ws_user, competitions_out.iter().map(|c| &c.competition_id)).await;
}
// TODO ideally would return response before awaiting publishing going out
publish_competitions(ws_conns, &competitions_out).await;
println!("{:?}", &competitions_out);
let resp_msg = WSMsgOut::resp(req.message_id, req.method, competitions_out);
let out = serde_json::to_string(&resp_msg).map_err(|e| e.into());
out
}
Box::pin(hmmm(req, conn, ws_conns, user_ws_id))
}
305 | Box::pin(hmmm(req, conn, ws_conns, user_ws_id))
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future returned by
`hmmm` is not `Sync`
So now just need to work out how to make this future sync
note: future is not `Sync` as this value is used across an await
gives me good clue
299 | publish_competitions(ws_conns, &competitions_out).await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await
occurs here, with conn
maybe used later
Worked out that I have to keep use of conn
outside of inner-async function, and not use across an await.
After fixing variables across await, I now arrive at
error[E0621]: explicit lifetime required in the type of `ws_conns`
--> src/handlers.rs:305:5
|
289 | pub fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Pin<Box<dyn Future<Output=Result<String, BoxError>> + Send + Sync>>{
| ------------------- help: add explicit lifetime `'static` to the type of `ws_conns`: `&'static mut std::sync::Arc<tokio::sync::mutex::Mutex<std::collections::HashMap<uuid::Uuid, warp_ws_server::WSConnection<subscriptions::Subscriptions>>>>`
...
305 | Box::pin(hmmm(req, competitions_out, ws_conns, user_ws_id))
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ lifetime `'static` required
Tried making &'static references, but eventually I get to point where that's not ok.
I also tried using upsert_competitions<U: lock_api::RawMutex + 'static>
generic types instead,
however getting the trait lock_api::mutex::RawMutex
is not implemented for std::sync::Arc<tokio::sync::mutex::Mutex<std::collections::HashMap<uuid::Uuid, warp_ws_server::WSConnection<subscriptions::Subscriptions>>>>
I need to find a U that implements .lock(), but is also a trait that Arc implements.
Upvotes: 1
Views: 953
Reputation: 73
user1937 simple answer likely works (will test later),
however overnight I realised that approach of putting functions into a hashmap and moving around references to the function.....was a bit overkill.
This is a use of traits (In one place I dont know implementation, but I can define an interface, and the other place impl that interface)
Instead I defined an async-trait (currently requires async-trait crate) in my lib
pub trait WSHandler<T: Subscriptions>{
async fn ws_req_resp(
msg: String, conn: PgConn, ws_conns: &mut WSConnections<T>, user_ws_id: Uuid
) -> Result<String, BoxError>;
}
And told it's funcs to expect a generic WsHandler
async fn handle_ws_msg<T: Subscriptions, U: WSHandler<T>>(
msg: ws::Message, conn: PgConn, ws_conns: &mut WSConnections<T>, user_ws_id: Uuid
) -> ws::Message{
match msg.to_str(){
// Can't get await inside `and_then`/`map` function chains to work properly
Ok(msg_str) => match U::ws_req_resp(msg_str.to_string(), conn, ws_conns, user_ws_id).await{
Ok(text) => ws::Message::text(text),
Err(e) => ws_error_resp(e.to_string())
},
Err(_) => ws_error_resp(String::from("wtf. How does msg.to_str fail?"))
}
}
then in my main program I was able to impl the trait
struct A{
}
#[async_trait]
impl WSHandler<subscriptions::Subscriptions> for A{
async fn ws_req_resp(
msg: String, conn: PgConn, ws_conns: &mut WSConnections<subscriptions::Subscriptions>, user_ws_id: Uuid
) -> Result<String, BoxError>{
let req: WSReq = serde_json::from_str(&msg)?;
println!("{}", &req.data);
let stringybob = String::from("upsert_competitions");
match req.method.clone(){
a if a == stringybob => upsert_competitions2(req, conn, ws_conns, user_ws_id).await,
// imagine the other methods here
uwotm8 => Err(Box::new(InvalidRequestError{description: uwotm8.to_string()}))
}
}
}
ws.on_upgrade(move |socket| warp_ws_server::handle_ws_conn::<subscriptions::Subscriptions, A>(socket, pool, ws_conns))
after 14 hours it finally runs. hooray :D
Upvotes: 0
Reputation: 5348
The return type of an async function when converted to a Fn is wrapped in a Future, not a pinned future, as you only need to pin it to start polling. Creating the future pinned from the start would make the process of building up composed futures from multiple async functions less efficient and more complicated. So the correct type is pub type WSMethod<T> = Box<dyn Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> [[UNNAMED TYPE implementing Future]]<Result<String, BoxError> + Send + Sync>>;
But you can't name that type [[UNNAMED TYPE implementing Future]] so you need to box it manually. The easiest way of doing this is with the boxed method from FutureExt in future.
So you need to combine changing the type to Box<dyn (Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Pin<Box<dyn Future<Output=Result<String, BoxError>> + Send + Sync >>) + Send + Sync>
with replacing the taking a reference to the method with Box::new(|req, conn, connections, uuid| upsert_competitions(req, conn, connections, uuid).boxed())
Upvotes: 2