Oskar Persson
Oskar Persson

Reputation: 6765

How do I create an actix-web server that accepts both sqlx database pools and transactions?

I'm trying to setup a web application using actix-web and sqlx where I can have tests that have their own webserver and database transaction. I've tried to setup my server creation such that it accepts either a database (Postgres) pool or a transaction using the Executor trait. Though I'm having some problems getting both the application code and the test to compile:

// main.rs

use std::net::TcpListener;

use actix_web::dev::Server;
use actix_web::{web, App, HttpServer, Responder};
use sqlx::PgPool;

async fn create_pool() -> PgPool {
    PgPool::connect("postgres://postgres:postgres@localhost:5432/postgres")
        .await
        .expect("Failed to create pool")
}

async fn index() -> impl Responder {
    "Hello World!"
}

pub fn create_server<'a, E: 'static>(
    listener: TcpListener,
    pool: E,
) -> Result<Server, std::io::Error>
where
    E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
{
    let server = HttpServer::new(move || App::new().data(pool).route("/", web::get().to(index)))
        .listen(listener)?
        .run();
    Ok(server)
}

pub async fn server(pool: PgPool) -> std::io::Result<()> {
    const PORT: usize = 8088;
    let listener =
        TcpListener::bind(format!("0.0.0.0:{}", PORT)).expect("Failed to create listener");

    println!("Running on port {}", PORT);

    create_server(listener, pool).unwrap().await
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let pool = create_pool().await;
    server(pool).await;
    Ok(())
}

#[cfg(test)]
pub mod tests {
    use super::*;
    use std::net::TcpListener;

    #[actix_rt::test]
    async fn test_foo() {
        let pool = create_pool().await;
        let mut transaction = pool.begin().await.expect("Failed to create transaction");

        let listener = TcpListener::bind("0.0.0.0:0").expect("Failed to create listener");
        let server = create_server(listener, &mut transaction).expect("Failed to create server");
        tokio::spawn(server);
    }
}
# Cargo.toml

[package]
name = "sqlx-testing"
version = "0.1.0"
authors = ["Oskar"]
edition = "2018"

[dependencies]
actix-rt = "1.1.1"
actix-web = "3.3.2"
sqlx = { version = "0.4.2", default-features = false, features = ["postgres", "runtime-async-std-native-tls"] }
tokio = "0.2.22"

Compilation output

error[E0277]: the trait bound `Pool<Postgres>: Executor<'_>` is not satisfied
  --> src\main.rs:37:29
   |
17 | pub fn create_server<'a, E: 'static>(
   |        ------------- required by a bound in this
...
22 |     E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
   |        --------------------------------------------- required by this bound in `create_server`
...
37 |     create_server(listener, pool).unwrap().await
   |                             ^^^^ the trait `Executor<'_>` is not implemented for `Pool<Postgres>`
   |
   = help: the following implementations were found:
             <&Pool<DB> as Executor<'p>>

error[E0277]: the trait bound `Pool<Postgres>: Copy` is not satisfied
  --> src\main.rs:37:29
   |
17 | pub fn create_server<'a, E: 'static>(
   |        ------------- required by a bound in this
...
22 |     E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
   |                                                        ---- required by this bound in `create_server`
...
37 |     create_server(listener, pool).unwrap().await
   |                             ^^^^ the trait `Copy` is not implemented for `Pool<Postgres>`

Upvotes: 3

Views: 5323

Answers (2)

pretzelhammer
pretzelhammer

Reputation: 15135

Trying to create a generic request handler that could accept both PgPool and &mut Transaction proved to be too challenging. Fortunately you can make a PgPool instance behave as if it was a transaction by limiting it to 1 connection and executing a BEGIN query before passing it to any handlers:

async fn get_transaction_pool() -> PgPool {
    let pool = PgPoolOptions::new()
         .max_connections(1)
         .connect("postgres://postgres:postgres@localhost:5432/postgres")
         .await
         .expect("Failed to create test pool.");

    sqlx::query("BEGIN")
         .execute(&pool)
         .await
         .expect("Failed to BEGIN transaction.");

    pool
}

I found it useful to abstract the above into its own TestTransaction struct which looks like this:

struct TestTransaction {
    pool: web::Data<PgPool>,
}

impl TestTransaction {
    async fn begin() -> Self {
        let pool = PgPoolOptions::new()
            .max_connections(1)
            .connect("postgres://postgres:postgres@localhost:5432/postgres")
            .await
            .expect("Failed to connect to test pool.");

        sqlx::query("BEGIN")
            .execute(&pool)
            .await
            .expect("Failed to BEGIN transaction.");

        TestTransaction {
            pool: web::Data::new(pool),
        }
    }

    fn get_pool(&self) -> web::Data<PgPool> {
        self.pool.clone()
    }

    async fn rollback(&self) {
        sqlx::query("ROLLBACK")
            .execute(self.pool.as_ref())
            .await
            .expect("Failed to ROLLBACK transaction.");
    }
}

Furthermore, you don't need to start HttpServer in each of your unit tests, you can just test the handlers directly following this simple template:

#[actix_rt::test]
async fn test_case() {
    let tx = TestTransaction::begin().await;

    let response = request_handler_func(tx.get_pool()).await;
    assert_eq!(response, "some expected value here");

    tx.rollback().await;
}

Here's the full main.rs with some comments:

use actix_web::{web, App, HttpServer};
use sqlx::{PgPool, Row};
use std::net::TcpListener;

async fn create_item(pool: web::Data<PgPool>) -> String {
    let id = sqlx::query("INSERT INTO items (label) VALUES ('label text') RETURNING id")
        .fetch_one(pool.as_ref())
        .await
        .expect("Failed to create item.")
        .get::<i64, _>("id");
    format!("created item with id {}", id)
}

async fn count_items(pool: web::Data<PgPool>) -> String {
    let count = sqlx::query("SELECT count(*) FROM items")
        .fetch_one(pool.as_ref())
        .await
        .expect("Failed to fetch item count.")
        .get::<i64, _>("count");
    format!("{} items in db", count)
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let pool = PgPool::connect("postgres://postgres:postgres@localhost:5432/postgres")
        .await
        .expect("Failed to create pool.");

    sqlx::query("CREATE TABLE IF NOT EXISTS items (id BIGSERIAL PRIMARY KEY, label TEXT)")
        .execute(&pool)
        .await
        .expect("Failed to create items table.");

    let listener = TcpListener::bind("0.0.0.0:8080").expect("Failed to create listener");

    println!("Listening on http://localhost:8080");
    println!("Try endpoints GET /create-item & GET /count-items");

    HttpServer::new(move || {
        App::new()
            .data(pool.clone())
            .route("/create-item", web::get().to(create_item))
            .route("/count-items", web::get().to(count_items))
    })
    .listen(listener)?
    .run()
    .await
}

#[cfg(test)]
pub mod tests {
    use super::*;
    use sqlx::postgres::PgPoolOptions;

    struct TestTransaction {
        pool: web::Data<PgPool>,
    }

    impl TestTransaction {
        async fn begin() -> Self {
            let pool = PgPoolOptions::new()
                .max_connections(1)
                .connect("postgres://postgres:postgres@localhost:5432/postgres")
                .await
                .expect("Failed to create test pool.");

            sqlx::query("BEGIN")
                .execute(&pool)
                .await
                .expect("Failed to BEGIN transaction.");

            // below 2 queries are necessary so that tests are always
            // run from within the same environment conditions, i.e.
            // the items table should be empty

            sqlx::query("DROP TABLE IF EXISTS items")
                .execute(&pool)
                .await
                .expect("Failed to drop test items table.");

            sqlx::query("CREATE TABLE IF NOT EXISTS items (id BIGSERIAL PRIMARY KEY, label TEXT)")
                .execute(&pool)
                .await
                .expect("Failed to create test items table.");

            TestTransaction {
                pool: web::Data::new(pool),
            }
        }

        fn get_pool(&self) -> web::Data<PgPool> {
            self.pool.clone()
        }

        async fn rollback(&self) {
            sqlx::query("ROLLBACK")
                .execute(self.pool.as_ref())
                .await
                .expect("Failed to ROLLBACK transaction.");
        }
    }

    // all tests below are run in parallel and are
    // isolated within their own transaction instances

    #[actix_rt::test]
    async fn create_and_count_1_items() {
        let tx = TestTransaction::begin().await;

        let response = create_item(tx.get_pool()).await;
        assert_eq!(response, "created item with id 1");

        let response = count_items(tx.get_pool()).await;
        assert_eq!(response, "1 items in db");

        tx.rollback().await;
    }

    #[actix_rt::test]
    async fn create_and_count_2_items() {
        let tx = TestTransaction::begin().await;

        let response = create_item(tx.get_pool()).await;
        assert_eq!(response, "created item with id 1");
        let response = create_item(tx.get_pool()).await;
        assert_eq!(response, "created item with id 2");

        let response = count_items(tx.get_pool()).await;
        assert_eq!(response, "2 items in db");

        tx.rollback().await;
    }

    #[actix_rt::test]
    async fn create_and_count_3_items() {
        let tx = TestTransaction::begin().await;

        let response = create_item(tx.get_pool()).await;
        assert_eq!(response, "created item with id 1");
        let response = create_item(tx.get_pool()).await;
        assert_eq!(response, "created item with id 2");
        let response = create_item(tx.get_pool()).await;
        assert_eq!(response, "created item with id 3");

        let response = count_items(tx.get_pool()).await;
        assert_eq!(response, "3 items in db");

        tx.rollback().await;
    }
}

You can of course run the tests with cargo test but you can also run cargo run and visit the endpoints in your browser at:

  • http://localhost:8080/create-item
  • http://localhost:8080/count-items

And although those endpoints modify the DB, if you shutdown the server and try running cargo test again the tests will still pass! This is because the TestTransaction struct effectively truncates the items table within the begin function which makes all the unit tests reproducible regardless of what's actually in the DB, and it does it safely within a transaction which is rolled back so no data is modified in the DB itself.

Upvotes: 5

Augustin
Augustin

Reputation: 591

Trying to be generic over the Executor trait is a bit overkill. You should probably just use a pool of size 1 in your test and manually callBegin and ROLLBACK.

#[actix_rt::test]
async fn test_endpoint() {
    // build with only one connection
    let pool = PgPoolOptions::new()
        .max_connections(1)
        .connect("postgres://postgres:postgres@localhost:5432/postgres")
        .await
        .expect("pool failed");

    sqlx::query("BEGIN")
        .execute(&pool)
        .await
        .expect("BEGIN failed");
    let saved_pool = pool.clone();
    let listener = TcpListener::bind("0.0.0.0:0").expect("Failed to create listener");
    let server = HttpServer::new(move || 
    App::new().data(pool.clone()).service(one))
            .listen(listener)
            .expect("fail to bind")
            .run();
    tokio::spawn(server);

    // your test

    sqlx::query("ROLLBACK")
        .execute(&saved_pool)
        .await
        .expect("ROLLBACK failed");
}

This way you don't have to change your code to handle your test

// main.rs
use actix_web::{get, web, App, HttpServer, Responder};
use sqlx::{postgres::PgPool, Row};
use std::net::TcpListener;

#[get("/one")]
async fn one(pool: web::Data<PgPool>) -> impl Responder {
    let row = sqlx::query("select 1 as id")
        .fetch_one(pool.get_ref())
        .await
        .unwrap();
    let one: i32 = row.try_get("id").unwrap();
    format!("{:?}", one)
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let pool = PgPool::connect("postgres://postgres:postgres@localhost:5432/postgres")
        .await
        .expect("Failed to create pool");
    const PORT: usize = 8088;
    let listener =
        TcpListener::bind(format!("0.0.0.0:{}", PORT)).expect("Failed to create listener");

    println!("Running on port {}", PORT);
    HttpServer::new(move || App::new().data(pool.clone()).service(one))
        .listen(listener)?
        .run()
        .await
}

#[cfg(test)]
pub mod tests {
    use super::*;
    use sqlx::postgres::PgPoolOptions;

    #[actix_rt::test]
    async fn test_endpoint() {
        // build with only one connection
        let pool = PgPoolOptions::new()
            .max_connections(1)
            .connect("postgres://postgres:postgres@localhost:5432/postgres")
            .await
            .expect("pool failed");

        sqlx::query("BEGIN")
            .execute(&pool)
            .await
            .expect("BEGIN failed");

        let saved_pool = pool.clone();

        let listener = TcpListener::bind("0.0.0.0:0").expect("Failed to create listener");
        let server = HttpServer::new(move || App::new().data(pool.clone()).service(one))
            .listen(listener)
            .expect("fail to bind")
            .run();
        tokio::spawn(server);

        // your test

        sqlx::query("ROLLBACK")
            .execute(&saved_pool)
            .await
            .expect("ROLLBACK failed");
    }

    #[actix_rt::test]
    async fn test_rollback() {
        let pool = PgPoolOptions::new()
            .max_connections(1)
            .connect("postgres://postgres:postgres@localhost:5432/postgres")
            .await
            .expect("pool failed");

        sqlx::query("BEGIN")
            .execute(&pool)
            .await
            .expect("BEGIN failed");

        sqlx::query("CREATE TABLE  IF NOT EXISTS test (id SERIAL, name TEXT)")
            .execute(&pool)
            .await
            .expect("CREATE TABLE test failed");

        sqlx::query("INSERT INTO test (name) VALUES ('bob')")
            .execute(&pool)
            .await
            .expect("INSERT test failed");

        let count: i64 = sqlx::query("SELECT COUNT(id) as count from test")
            .fetch_one(&pool)
            .await
            .expect("SELECT COUNT test failed")
            .try_get("count")
            .unwrap();
        sqlx::query("ROLLBACK")
            .execute(&pool)
            .await
            .expect("ROLLBACK failed");

        assert_eq!(count, 1);
    }

    #[actix_rt::test]
    async fn test_no_rollback() {
        let pool = PgPoolOptions::new()
            .max_connections(1)
            .connect("postgres://postgres:postgres@localhost:5432/postgres")
            .await
            .expect("pool failed");

        sqlx::query("CREATE TABLE  IF NOT EXISTS test2 (id SERIAL, name TEXT)")
            .execute(&pool)
            .await
            .expect("CREATE TABLE test failed");

        sqlx::query("INSERT INTO test2 (name) VALUES ('bob')")
            .execute(&pool)
            .await
            .expect("INSERT test failed");

        let count: i64 = sqlx::query("SELECT COUNT(id) as count from test2")
            .fetch_one(&pool)
            .await
            .expect("SELECT COUNT failed")
            .try_get("count")
            .unwrap();

        // this will failed the second time you run your test
        assert_eq!(count, 1);
    }
}

Upvotes: 9

Related Questions