Skru
Skru

Reputation: 115

Lifetime issues with async closure in rust

I want to wrap an db transaction from the diesel-async crate into a generic connection object that provides an async transaction method on its own. Unfortunately, whatever I try the borrow checker complains about my lifetimes and I don't understand them good enough yet to solve this on my own.

Quick overview:

Most parts of the code should work, except of the transaction method.

My questions are:

# Cargo.toml

[dependencies]
tokio = { version = "1.0.0", features = ["rt", "rt-multi-thread", "macros"] }
diesel = { version = "2.2.4", features = ["postgres"]}
diesel-async = { version = "0.5.0", features = ["postgres"] }
// main.rs

use std::future::Future;
use std::pin::Pin;

use diesel_async::{AsyncConnection, AsyncPgConnection};
use diesel_async::scoped_futures::ScopedFutureExt;

// Business logic as async functions that can be shared between threads
trait Service: Send + Sync + 'static {

    /// Update value of item if new value is larger than current value.
    /// Example functino that needs multiple repo functions
    fn maximize_item(&self, id: i32, value: i32) -> impl Future<Output=Result<i32, String>> + Send;
}

// Persist the data in e.g. a database
// The repository provides a new connection and expects that repo methods are called with
trait Repository: Send + Sync + 'static {
    type Conn: Connection;

    /// Connect to data storage, or throw error if attempt failed
    fn connect(&self) -> impl Future<Output=Result<Self::Conn, String>> + Send;

    /// Get current value for item, or throw error if item doesn't exist or value is not retrievable
    fn get_item_value(&self, conn: &mut Self::Conn, id: i32) -> impl Future<Output=Result<i32, String>> + Send;

    /// Update value for item, or throw error if item doesn't exist or can't be updated
    fn update_item_value(&self, conn: &mut Self::Conn, id: i32, value: i32) -> impl Future<Output=Result<i32, String>> + Send;
}


// Connection to some data storage
trait Connection: AsMut<Self::SpecificConn> + Send + Sync + 'static {
    type SpecificConn: Send + Sync;

    /// Provide a generic transaction as closure that is agnostic to the underlying db or ORM.
    /// Commit if closure returns Ok`, rollback on `Err`
    // THe lifetimes are wrong, and possibly more, but I don't know how to fix this method
    fn transaction<'a, F, R>(&'a mut self, callback: F) -> impl Future<Output=Result<R, String>> + Send
    where
        F: FnOnce(&'a mut Self) -> Pin<Box<dyn Future<Output=Result<R, String>> + Send>> + Send,
        R: Send;
}

struct ServiceImpl<R: Repository> {
    repo: R, // service holds a generic repository to persist date
}

// implementation for Service while keeping the repository generic
impl<R: Repository> Service for ServiceImpl<R> {

    // The service methods sometimes need to call multiple repository functions, but need
    // to ensure this is done as an atomic transaction. Instead of writing business logic to roll
    // back failed attempts on my own, I rather want to use a database transaction for this
    async fn maximize_item(&self, id: i32, value: i32) -> Result<i32, String> {

        // get new db connection wrapped into a generic object
        let mut conn = self.repo.connect().await?;

        // wrap repository tasks into an atomic but generic transaction. The repo needs
        // to ensure a transaction is atomic
        conn.transaction(|conn: &mut R::Conn| Box::pin(async move {

            // get value and update value are different methods, but
            // should be executed as atomic transaction here
            let current = self.repo.get_item_value(conn, id).await?;
            if value > current {
                let updated = self.repo.update_item_value(conn, id, value).await?;
                Ok(updated)
            } else {
                Ok(current)
            }
        })).await
    }
}

struct RepositoryForSpecificDbType;

// implement the repository for a specific database and ORM (or a mock for testing).
// In this case it is Postgres with Diesel
impl<'a> Repository for RepositoryForSpecificDbType {
    type Conn = ConnectionToSpecificDbType<'a>;

    // wrap diesel connection to postgres db into a generic connection object
    async fn connect(&self) -> Result<ConnectionToSpecificDbType, String> {
        let dsn = "postgres:://root:****@localhost:my_db";
        let conn = AsyncPgConnection::establish(dsn).await.map_err(|_| "Connection failed".to_string())?;
        Ok(ConnectionToSpecificDbType::new(conn))
    }

    async fn get_item_value(&self, _conn: &mut Self::Conn, id: i32) -> Result<i32, String> {
        // mocking some database lookup
        Ok(id * 2)
    }

    async fn update_item_value(&self, _conn: &mut Self::Conn, _id: i32, value: i32) -> Result<i32, String> {
        // mocking some database update
        Ok(value)
    }
}

struct ConnectionToSpecificDbType<'c> {
    /// conn holds the connection as long as it is new
    conn: Option<AsyncPgConnection>,

    // conn mut holds the connection as mutable reference when constructed inside
    // the diesel transaction closure
    conn_mut: Option<&'c mut AsyncPgConnection>,
}

impl<'c> ConnectionToSpecificDbType<'_> {
    fn new(conn: AsyncPgConnection) -> ConnectionToSpecificDbType<'c> {
        ConnectionToSpecificDbType {
            conn: Some(conn),
            conn_mut: None,
        }
    }

    fn new_from_mut(conn: &'c mut AsyncPgConnection) -> ConnectionToSpecificDbType<'c> {
        ConnectionToSpecificDbType {
            conn: None,
            conn_mut: Some(conn),
        }
    }
}

// get mutable reference to connection
impl<'c> AsMut<AsyncPgConnection> for ConnectionToSpecificDbType<'_> {
    fn as_mut(&mut self) -> &mut AsyncPgConnection {
        // the struct can own the postgres connection outside the closure or hold a mutable reference if inside the closure
        // is there a better way to handle this?
        if self.conn.is_some() { self.conn.as_mut().unwrap() } else { self.conn_mut.as_mut().unwrap() }
    }
}

#[derive(Debug)]
enum TransactionError<E: Send + Sync> {
    UserCaused(E),
    DieselCaused(diesel::result::Error),
}

// A diesel transaction requires that a closure error implements `From<Error>`.
// To stay generic, handle errors in the closure and translate them into a generic error
impl<E: Send + Sync> From<diesel::result::Error> for TransactionError<E> {
    fn from(value: diesel::result::Error) -> Self {
        TransactionError::DieselCaused(value)
    }
}

impl<'c> Connection for ConnectionToSpecificDbType<'_> {
    type SpecificConn = AsyncPgConnection;

    // This function doesn't work
    // it should provide a generic abstraction of a transaction
    async fn transaction<'a, F, R>(&'a mut self, callback: F) -> Result<R, String>
    where
        F: FnOnce(&'a mut Self) -> Pin<Box<dyn Future<Output=Result<R, String>> + Send>> + Send,
        R: Send,
    {
        let result = self.as_mut().transaction(|diesel_conn| async move {

            // wrap mut ref to diesel connection into connection object that implements Connection
            let mut inner_conn = ConnectionToSpecificDbType::new_from_mut(diesel_conn);

            // call closure
            let result = callback(&mut inner_conn).await;

            // translate into generic intermediate error type that implements From<Error>
            result.map_err(|e| TransactionError::UserCaused(e))
        }.scope_boxed()).await;

        // return result
        result.map_err(|e| format!("{e:?}").to_string())
    }
}

#[tokio::main]
async fn main() {
    // create a new service instance and define which repository implementation is used
    let service = ServiceImpl {
        repo: RepositoryForSpecificDbType
    };

    assert_eq!(service.maximize_item(5, 3).await.unwrap(), 10);
    assert_eq!(service.maximize_item(5, 8).await.unwrap(), 10);
    assert_eq!(service.maximize_item(5, 13).await.unwrap(), 13);
    assert_eq!(service.maximize_item(5, 21).await.unwrap(), 21);
}

This is the current list of compiler errors I get:

error[E0597]: `conn` does not live long enough
  --> src/main.rs:61:9
   |
57 |           let mut conn = self.repo.connect().await?;
   |               -------- binding `conn` declared here
...
61 |           conn.transaction(|conn: &mut R::Conn| Box::pin(async move {
   |  _________^^^^__________________________________-
   | |         |
   | |         borrowed value does not live long enough
62 | |
63 | |             // get value and update value are different methods, but
64 | |             // should be executed as atomic transaction here
...  |
71 | |             }
72 | |         })).await
   | |__________- returning this value requires that `conn` is borrowed for `'static`
73 |       }
   |       - `conn` dropped here while still borrowed

error: lifetime may not live long enough
  --> src/main.rs:61:47
   |
54 |       async fn maximize_item(&self, id: i32, value: i32) -> Result<i32, String> {
   |                              - let's call the lifetime of this reference `'1`
...
61 |           conn.transaction(|conn: &mut R::Conn| Box::pin(async move {
   |  _______________________________________________^
62 | |
63 | |             // get value and update value are different methods, but
64 | |             // should be executed as atomic transaction here
...  |
71 | |             }
72 | |         })).await
   | |__________^ returning this value requires that `'1` must outlive `'static`

error[E0597]: `inner_conn` does not live long enough
   --> src/main.rs:165:35
    |
154 |     async fn transaction<'a, F, R>(&'a mut self, callback: F) -> Result<R, String>
    |                          -- lifetime `'a` defined here
...
162 |             let mut inner_conn = ConnectionToSpecificDbType::new_from_mut(diesel_conn);
    |                 -------------- binding `inner_conn` declared here
...
165 |             let result = callback(&mut inner_conn).await;
    |                          ---------^^^^^^^^^^^^^^^-
    |                          |        |
    |                          |        borrowed value does not live long enough
    |                          argument requires that `inner_conn` is borrowed for `'a`
...
169 |         }.scope_boxed()).await;
    |         - `inner_conn` dropped here while still borrowed

error[E0521]: borrowed data escapes outside of closure
   --> src/main.rs:162:34
    |
149 | impl<'c> Connection for ConnectionToSpecificDbType<'_> {
    |                                                    -- lifetime `'2` appears in the `impl`'s self type
...
159 |         let result = self.as_mut().transaction(|diesel_conn| async move {
    |                                                 -----------
    |                                                 |
    |                                                 `diesel_conn` is a reference that is only valid in the closure body
    |                                                 has type `&'1 mut AsyncPgConnection`
...
162 |             let mut inner_conn = ConnectionToSpecificDbType::new_from_mut(diesel_conn);
    |                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |                                  |
    |                                  `diesel_conn` escapes the closure body here
    |                                  assignment requires that `'1` must outlive `'2`

error[E0311]: the parameter type `F` may not live long enough
   --> src/main.rs:159:22
    |
149 |   impl<'c> Connection for ConnectionToSpecificDbType<'_> {
    |                                                      -- the parameter type `F` must be valid for the anonymous lifetime as defined here...
...
159 |           let result = self.as_mut().transaction(|diesel_conn| async move {
    |  ______________________^
160 | |
161 | |             // wrap mut ref to diesel connection into connection object that implements Connection
162 | |             let mut inner_conn = ConnectionToSpecificDbType::new_from_mut(diesel_conn);
...   |
168 | |             result.map_err(|e| TransactionError::UserCaused(e))
169 | |         }.scope_boxed()).await;
    | |________________________^ ...so that the type `F` will meet its required lifetime bounds
    |
help: consider adding an explicit lifetime bound
    |
149 ~ impl<'a, 'c> Connection for ConnectionToSpecificDbType<'a> {
150 |     type SpecificConn = AsyncPgConnection;
...
155 |     where
156 ~         F: FnOnce(&'a mut Self) -> Pin<Box<dyn Future<Output=Result<R, String>> + Send>> + Send + 'a,
    |

error[E0309]: the parameter type `F` may not live long enough
   --> src/main.rs:159:22
    |
154 |       async fn transaction<'a, F, R>(&'a mut self, callback: F) -> Result<R, String>
    |                            -- the parameter type `F` must be valid for the lifetime `'a` as defined here...
...
159 |           let result = self.as_mut().transaction(|diesel_conn| async move {
    |  ______________________^
160 | |
161 | |             // wrap mut ref to diesel connection into connection object that implements Connection
162 | |             let mut inner_conn = ConnectionToSpecificDbType::new_from_mut(diesel_conn);
...   |
168 | |             result.map_err(|e| TransactionError::UserCaused(e))
169 | |         }.scope_boxed()).await;
    | |________________________^ ...so that the type `F` will meet its required lifetime bounds
    |
help: consider adding an explicit lifetime bound
    |
156 |         F: FnOnce(&'a mut Self) -> Pin<Box<dyn Future<Output=Result<R, String>> + Send>> + Send + 'a,
    |                                                                                                 ++++

error[E0311]: the parameter type `R` may not live long enough
   --> src/main.rs:159:22
    |
149 |   impl<'c> Connection for ConnectionToSpecificDbType<'_> {
    |                                                      -- the parameter type `R` must be valid for the anonymous lifetime as defined here...
...
159 |           let result = self.as_mut().transaction(|diesel_conn| async move {
    |  ______________________^
160 | |
161 | |             // wrap mut ref to diesel connection into connection object that implements Connection
162 | |             let mut inner_conn = ConnectionToSpecificDbType::new_from_mut(diesel_conn);
...   |
168 | |             result.map_err(|e| TransactionError::UserCaused(e))
169 | |         }.scope_boxed()).await;
    | |________________________^ ...so that the type `R` will meet its required lifetime bounds
    |
help: consider adding an explicit lifetime bound
    |
149 ~ impl<'a, 'c> Connection for ConnectionToSpecificDbType<'a> {
150 |     type SpecificConn = AsyncPgConnection;
...
156 |         F: FnOnce(&'a mut Self) -> Pin<Box<dyn Future<Output=Result<R, String>> + Send>> + Send,
157 ~         R: Send + 'a,
    |

error[E0309]: the parameter type `R` may not live long enough
   --> src/main.rs:159:22
    |
154 |       async fn transaction<'a, F, R>(&'a mut self, callback: F) -> Result<R, String>
    |                            -- the parameter type `R` must be valid for the lifetime `'a` as defined here...
...
159 |           let result = self.as_mut().transaction(|diesel_conn| async move {
    |  ______________________^
160 | |
161 | |             // wrap mut ref to diesel connection into connection object that implements Connection
162 | |             let mut inner_conn = ConnectionToSpecificDbType::new_from_mut(diesel_conn);
...   |
168 | |             result.map_err(|e| TransactionError::UserCaused(e))
169 | |         }.scope_boxed()).await;
    | |________________________^ ...so that the type `R` will meet its required lifetime bounds
    |
help: consider adding an explicit lifetime bound
    |
157 |         R: Send + 'a,
    |                 ++++

error[E0311]: the parameter type `F` may not live long enough
   --> src/main.rs:159:62
    |
149 |   impl<'c> Connection for ConnectionToSpecificDbType<'_> {
    |                                                      -- the parameter type `F` must be valid for the anonymous lifetime as defined here...
...
159 |           let result = self.as_mut().transaction(|diesel_conn| async move {
    |  ______________________________________________________________^
160 | |
161 | |             // wrap mut ref to diesel connection into connection object that implements Connection
162 | |             let mut inner_conn = ConnectionToSpecificDbType::new_from_mut(diesel_conn);
...   |
168 | |             result.map_err(|e| TransactionError::UserCaused(e))
169 | |         }.scope_boxed()).await;
    | |_______________________^ ...so that the type `F` will meet its required lifetime bounds
    |
help: consider adding an explicit lifetime bound
    |
149 ~ impl<'a, 'c> Connection for ConnectionToSpecificDbType<'a> {
150 |     type SpecificConn = AsyncPgConnection;
...
155 |     where
156 ~         F: FnOnce(&'a mut Self) -> Pin<Box<dyn Future<Output=Result<R, String>> + Send>> + Send + 'a,
    |

error[E0309]: the parameter type `F` may not live long enough
   --> src/main.rs:159:62
    |
154 |       async fn transaction<'a, F, R>(&'a mut self, callback: F) -> Result<R, String>
    |                            -- the parameter type `F` must be valid for the lifetime `'a` as defined here...
...
159 |           let result = self.as_mut().transaction(|diesel_conn| async move {
    |  ______________________________________________________________^
160 | |
161 | |             // wrap mut ref to diesel connection into connection object that implements Connection
162 | |             let mut inner_conn = ConnectionToSpecificDbType::new_from_mut(diesel_conn);
...   |
168 | |             result.map_err(|e| TransactionError::UserCaused(e))
169 | |         }.scope_boxed()).await;
    | |_______________________^ ...so that the type `F` will meet its required lifetime bounds
    |
help: consider adding an explicit lifetime bound
    |
156 |         F: FnOnce(&'a mut Self) -> Pin<Box<dyn Future<Output=Result<R, String>> + Send>> + Send + 'a,
    |                                                                                                 ++++

error[E0207]: the lifetime parameter `'a` is not constrained by the impl trait, self type, or predicates
  --> src/main.rs:80:6
   |
80 | impl<'a> Repository for RepositoryForSpecificDbType {
   |      ^^ unconstrained lifetime parameter

error[E0478]: lifetime bound not satisfied
   --> src/main.rs:149:10
    |
149 | impl<'c> Connection for ConnectionToSpecificDbType<'_> {
    |          ^^^^^^^^^^
    |
note: lifetime parameter instantiated with the anonymous lifetime as defined here
   --> src/main.rs:149:52
    |
149 | impl<'c> Connection for ConnectionToSpecificDbType<'_> {
    |                                                    ^^
    = note: but lifetime parameter must outlive the static lifetime

error: aborting due to 12 previous errors

Some errors have detailed explanations: E0207, E0309, E0311, E0478, E0521, E0597.
For more information about an error, try `rustc --explain E0207`.
error: could not compile `async_closure` (bin "async_closure") due to 13 previous errors
Process finished with exit code 101

Upvotes: 0

Views: 144

Answers (1)

Skru
Skru

Reputation: 115

The code finally compiles!

The partial solution in the comment by @trueequalsfalse uses transmute() to turn the mutable reference to the diesel connection into a ConnectionToSpecificDbType without the necessity of adding a lifetime to ConnectionToSpecificDbType.

The transaction() function in diesel-async uses boxed futures and the async-trait crate to encapsulate the lifetimes of the transaction there.

I combined both approaches to wrap the diesel transaction into a generic transaction.

I also added a mock for the diesel connection at the end, so that the code can be executed without db.

For completeness, the reason I wanted to use generic connection is that I wanted to try out this guide on Hexagonal Architecture, but needed transactions in my Service impl, which were not covered in the guide.

# Cargo.toml

[dependencies]
tokio = { version = "1.0.0", features = ["rt", "rt-multi-thread", "macros"] }
diesel = { version = "2.2.4", features = ["postgres"] }
diesel-async = { version = "0.5.0", features = ["postgres"] }
async-trait = "0.1.83"
scoped-futures = "0.1.3"

All changes to the original code in the question are marked with ***.

#[deny(unsafe_code)] // *** added
use std::future::Future;

use scoped_futures::{ScopedBoxFuture, ScopedFutureExt}; // ** changed

use crate::diesel_async_mock::{AsyncConnection, AsyncPgConnection}; // *** changed to use diesel mock


// Business logic as async functions that can be shared between threads
trait Service: Send + Sync + 'static {
    /// Update value of item if new value is larger than current value.
    /// Example functino that needs multiple repo functions
    fn maximize_item(&self, id: i32, value: i32) -> impl Future<Output=Result<i32, String>> + Send;
}

// Persist the data in e.g. a database
// The repository provides a new connection and expects that repo methods are called with
trait Repository: Send + Sync + 'static {
    type Conn: Connection;

    /// Connect to data storage, or throw error if attempt failed
    fn connect(&self) -> impl Future<Output=Result<Self::Conn, String>> + Send;

    /// Get current value for item, or throw error if item doesn't exist or value is not retrievable
    fn get_item_value(&self, conn: &mut Self::Conn, id: i32) -> impl Future<Output=Result<i32, String>> + Send;

    /// Update value for item, or throw error if item doesn't exist or can't be updated
    fn update_item_value(&self, conn: &mut Self::Conn, id: i32, value: i32) -> impl Future<Output=Result<i32, String>> + Send;
}


// Connection to some data storage
#[async_trait::async_trait] // *** added
trait Connection: AsMut<Self::SpecificConn> + Send + Sync + 'static {
    type SpecificConn: Send + Sync;

    /// Provide a generic transaction as closure that is agnostic to the underlying db or ORM.
    /// Commit if closure returns Ok`, rollback on `Err`
    // THe lifetimes are wrong, and possibly more, but I don't know how to fix this method
    async fn transaction<'a, R, F>(&mut self, callback: F) -> Result<R, String> // *** changed
    where
        F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, String>> + Send + 'a, // *** changed
        R: Send + 'a; // *** changed
}

struct ServiceImpl<R: Repository> {
    repo: R, // service holds a generic repository to persist date
}

// implementation for Service while keeping the repository generic
impl<R: Repository> Service for ServiceImpl<R> {
    // The service methods sometimes need to call multiple repository functions, but need
    // to ensure this is done as an atomic transaction. Instead of writing business logic to roll
    // back failed attempts on my own, I rather want to use a database transaction for this
    async fn maximize_item(&self, id: i32, value: i32) -> Result<i32, String> {

        // get new db connection wrapped into a generic object
        let mut conn = self.repo.connect().await?;

        // wrap repository tasks into an atomic but generic transaction. The repo needs
        // to ensure a transaction is atomic
        conn.transaction(|conn: &mut R::Conn| async move {

            // get value and update value are different methods, but
            // should be executed as atomic transaction here
            let current = self.repo.get_item_value(conn, id).await?;
            if value > current {
                let updated = self.repo.update_item_value(conn, id, value).await?;
                println!("Maximized item [{id}] from {current} to {updated}"); // *** added
                Ok::<i32, String>(updated)
            } else {
                println!("Kept item [{id}] at {current}, because {value} is smaller"); // *** added
                Ok::<i32, String>(current)
            }
        }.scope_boxed()).await // *** added scope_boxed
    }
}

struct RepositoryForSpecificDbType;

// implement the repository for a specific database and ORM (or a mock for testing).
// In this case it is Postgres with Diesel
impl Repository for RepositoryForSpecificDbType { // *** lifetime removed
    type Conn = ConnectionToSpecificDbType; // *** lifetime removed

    // wrap diesel connection to postgres db into a generic connection object
    async fn connect(&self) -> Result<ConnectionToSpecificDbType, String> {
        let dsn = "postgres:://root:****@localhost:my_db";
        let conn = AsyncPgConnection::establish(dsn).await.map_err(|_| "Connection failed".to_string())?;
        Ok(ConnectionToSpecificDbType::new(conn))
    }

    async fn get_item_value(&self, _conn: &mut Self::Conn, id: i32) -> Result<i32, String> {
        // mocking some database lookup
        Ok(id * 2)
    }

    async fn update_item_value(&self, _conn: &mut Self::Conn, _id: i32, value: i32) -> Result<i32, String> {
        // mocking some database update
        Ok(value)
    }
}

#[repr(transparent)] // *** added
struct ConnectionToSpecificDbType(AsyncPgConnection); // *** struct def changed

impl ConnectionToSpecificDbType { // *** lifetime removed
    fn new(conn: AsyncPgConnection) -> Self { // *** function changed
        Self(conn)
    }

    fn new_from_mut(conn: &mut AsyncPgConnection) -> &mut Self { // *** function changed
        #[allow(unsafe_code)]
        unsafe {
            // Transmutes a diesel connection into a ConnectionToSpecificDbType.
            // Source and result are both mutable references, lifetime of source is copied (I guess)
            // SAFETY: transparent repr is used for Self
            core::mem::transmute::<&mut AsyncPgConnection, &mut Self>(conn)
        }
    }
}

// get mutable reference to connection
impl AsMut<AsyncPgConnection> for ConnectionToSpecificDbType { // *** lifetime removed
    fn as_mut(&mut self) -> &mut AsyncPgConnection { // *** function changed
        &mut self.0
    }
}

#[derive(Debug)]
enum TransactionError<E: Send + Sync> {
    UserCaused(E),
    #[allow(dead_code)] // *** added: DieselCaused is not used currently
    DieselCaused(diesel::result::Error),
}

// A diesel transaction requires that a closure error implements `From<Error>`.
// To stay generic, handle errors in the closure and translate them into a generic error
impl<E: Send + Sync> From<diesel::result::Error> for TransactionError<E> {
    fn from(value: diesel::result::Error) -> Self {
        TransactionError::DieselCaused(value)
    }
}

#[async_trait::async_trait] // *** added
impl Connection for ConnectionToSpecificDbType { // *** lifetime removed
    type SpecificConn = AsyncPgConnection;

    // This function doesn't work
    // it should provide a generic abstraction of a transaction
    async fn transaction<'a, R, F>(&mut self, callback: F) -> Result<R, String> // *** changed
    where
        F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, String>> + Send + 'a, // *** changed
        R: Send + 'a, // *** changed
    {
        // *** inner_conn def removed
        let result = self.as_mut().transaction(|diesel_conn| async move {
            println!("Entered transaction"); // *** added

            // wrap mut ref to diesel connection into connection object that implements Connection
            let inner_conn: &mut ConnectionToSpecificDbType = ConnectionToSpecificDbType::new_from_mut(diesel_conn); // *** changed

            // call closure
            let result = callback(inner_conn).await; // *** changed

            // translate into generic intermediate error type that implements From<Error>
            result.map_err(|e| TransactionError::UserCaused(e))
        }.scope_boxed()).await;

        println!("Left transaction"); // *** added

        // return result
        result.map_err(|e| format!("{e:?}").to_string())
    }
}

mod diesel_async_mock { // *** mod added
    use diesel::ConnectionResult;
    use scoped_futures::ScopedBoxFuture;

    pub struct AsyncPgConnection;

    #[async_trait::async_trait]
    pub trait AsyncConnection: Sized + Send {
        async fn establish(database_url: &str) -> ConnectionResult<Self>;

        async fn transaction<'a, R, E, F>(&mut self, callback: F) -> Result<R, E>
        where
            F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
            E: From<diesel::result::Error> + Send + 'a,
            R: Send + 'a,
        {
            callback(self).await
        }
    }
    #[async_trait::async_trait]
    impl AsyncConnection for AsyncPgConnection {
        async fn establish(_database_url: &str) -> ConnectionResult<Self> {
            Ok(Self)
        }
    }
}

#[tokio::main]
async fn main() {
    // create a new service instance and define which repository implementation is used
    let service = ServiceImpl {
        repo: RepositoryForSpecificDbType
    };

    assert_eq!(service.maximize_item(5, 3).await.unwrap(), 10);
    assert_eq!(service.maximize_item(5, 8).await.unwrap(), 10);
    assert_eq!(service.maximize_item(5, 13).await.unwrap(), 13);
    assert_eq!(service.maximize_item(5, 21).await.unwrap(), 21);
}

Output:

Entered transaction
Kept item [5] at 10, because 3 is smaller
Left transaction
Entered transaction
Kept item [5] at 10, because 8 is smaller
Left transaction
Entered transaction
Maximized item [5] from 10 to 13
Left transaction
Entered transaction
Maximized item [5] from 10 to 21
Left transaction

Upvotes: 0

Related Questions