januw a
januw a

Reputation: 2248

How to create other threads in main function

I am using the stream function of redis in actix-web 4, I want to create the consumer in the main function, this is my current code

[dependencies]
actix-web = "4"
tokio = { version = "1", features = ["full"] }
redis = { version = "0.21", features = [
  # "cluster",
  "tokio-comp",
  "tokio-native-tls-comp",
] }
#[actix_web::main]
async fn main() -> std::io::Result<()> {
    utils::init::init_envfile();
    env_logger::init_from_env(env_logger::Env::new());

    let redis_pool = utils::init::init_redis_pool();
    let mysql_pool = utils::init::init_mysql_pool();

    let redist_stream_consumer = web::block(redis_stream_group);

    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(redis_pool.clone()))
            .app_data(web::Data::new(mysql_pool.clone()))
       .service(web::scope("/api").configure(controller::api::config))
    })
    .bind(("0.0.0.0", 7777))?
    .run()
    .await?;

    redist_stream_consumer.await.unwrap();
    Ok(())
}

fn redis_stream_group() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("client");
    let mut con = client.get_connection().expect("con");
    let key = "s.order";
    let group_name = "g1";
    let consumer_name = "c1";

    let _: Result<(), _> = con.xgroup_create_mkstream(key, group_name, "$");

    let opts = StreamReadOptions::default()
        .group(group_name, consumer_name)
        .count(1)
        .block(0);

    loop {
        let read_reply: StreamReadReply =
            con.xread_options(&[key], &[">"], &opts).expect("read err");
        for StreamKey { key, ids } in read_reply.keys {
            for StreamId { id, map } in &ids {
                log::info!("id:{} | key:{} | data:{:?}", id, key, map);
            }

            let id_strs: Vec<&String> = ids.iter().map(|StreamId { id, map: _ }| id).collect();
            let _: usize = con.xack(key, group_name, &id_strs).expect("ack err");
        }
    }
}

When I use cargo r, I can run the program normally and get the sent messages, but when I execute ctrl+c, I can't exit the program.

Also I'm not sure if using web::block in the main function is correct and if there is a better way to run child threads


UPDATE: Tried using tokio::spawn, seems to work

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let redis_pool = utils::init::init_redis_pool();
    let mysql_pool = utils::init::init_mysql_pool();

    for consumer_index in 1..=2 {
        let c_redis_pool = redis_pool.clone();
        tokio::spawn(async move {
            let mut con = c_redis_pool.get().await.unwrap();
            let key = "s.order";
            let group_name = "g1";
            let consumer_name = &format!("c{consumer_index}");
            let _: Result<(), _> = con.xgroup_create_mkstream(key, group_name, "$").await;

            let opts = StreamReadOptions::default()
                .group(group_name, consumer_name)
                .count(1)
                .block(5000);

            loop {
                let read_reply: StreamReadReply = con
                    .xread_options(&[key], &[">"], &opts)
                    .await
                    .expect("err");
                for StreamKey { key, ids } in read_reply.keys {
                    for StreamId { id, map } in &ids {
                        log::info!(
                            "consumer: {} | id:{} | key:{} | data:{:?}",
                            consumer_name,
                            id,
                            key,
                            map
                        );
                    }
                    let id_strs: Vec<&String> =
                        ids.iter().map(|StreamId { id, map: _ }| id).collect();
                    let _: usize = con
                        .xack(key, group_name, &id_strs)
                        .await
                        .expect("ack err");
                }
            }
        });
    }

    let serve = HttpServer::new(move || {
   ...
}

Upvotes: 0

Views: 1213

Answers (1)

Esdeseserdt
Esdeseserdt

Reputation: 301

This can be done with the standard library by useing std::thread and then creating the thread and whatever you want the other thread to do in a closure

fn main() {
    thread::spawn(|| {
        println!("doing things in the thread!");
    });
    println!("doing things outside the thread.... how boring");
}

if you want to pass data between them, you can use std::sync::mpsc to transfer data between the threads safely and quickly, using let (item_one,item_two) = mpsc::channel();, like so

fn main() {
   let (sender,receiver) = mpsc::channel();
   thread::spawn(move || {
        let message = String::from("This message is from the thread");
        sender.send(message).unwrap();
   });
   let letter = receiver.recv().unwrap();

note that the main thread proceeds as normal until it comes to the .recv(), at which it either receives the data from the thread, or waits until the other thread is done.

in your example you could do something like

use std::sync::mpsc;
use std::thread;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
    utils::init::init_envfile();
    env_logger::init_from_env(env_logger::Env::new());

    let port = get_env!("ACTIX_PORT", u16);
    log::info!(
        "starting HTTP server at http://{}:{}",
        local_ipaddress::get().unwrap_or("localhost".to_string()),
        port
    );
    let redis_pool = utils::init::init_redis_pool();
    let mysql_pool = utils::init::init_mysql_pool();

    let (consumer_sender,consumer_listener) = mpsc::channel();
    thread::spawn(move || {
        consumer_sender.send(redis_stream_group()).expect("You probably want to handle this case, but I'm too lazy");
    });
    let serve = HttpServer::new(move || {
        let app_state = utils::init::AppState {
            app_name: get_env!("APP_NAME", String),
            pwd_secret: get_env!("PWD_SECRET", String),
            jwt_secret: get_env!("JWT_SECRET", String),
            jwt_exp: get_env!("JWT_EXP", i64),
        };
        App::new()
            .app_data(web::Data::new(awc::Client::default()))
            .app_data(web::Data::new(app_state))
            .app_data(web::Data::new(redis_pool.clone()))
            .app_data(web::Data::new(mysql_pool.clone()))
            .wrap(actix_cors::Cors::default().allowed_origin_fn(|_, _| true))
            .service(web::scope("/chat").configure(controller::chat::config))
            .service(web::scope("/ws").configure(controller::ws::config))
            .service(web::scope("/api").configure(controller::api::config))
    });
    if cfg!(debug_assertions) {
        serve.bind(("0.0.0.0", port))?
    } else {
        let p = format!("/tmp/{}.socket", get_env!("APP_NAME", String));
        let r = serve.bind_uds(&p)?;
        let mut perms = std::fs::metadata(&p)?.permissions();
        perms.set_readonly(false);
        std::fs::set_permissions(&p, perms)?;
        r
    }
    .run()
    .await?;
    let consumer = consumer_listener.recv().unwrap();
    //then put things to do with the consumer here, or not idc
    Ok(())
}

fn redis_stream_group() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("client");
    let mut con = client.get_connection().expect("con");
    let key = "s.order";
    let group_name = "g1";
    let consumer_name = "c1";

    let _: Result<(), _> = con.xgroup_create_mkstream(key, group_name, "$");

    let opts = StreamReadOptions::default()
        .group(group_name, consumer_name)
        .count(1)
        .block(0);

    loop {
        let read_reply: StreamReadReply =
            con.xread_options(&[key], &[">"], &opts).expect("read err");
        for StreamKey { key, ids } in read_reply.keys {
            for StreamId { id, map } in &ids {
                log::info!("id:{} | key:{} | data:{:?}", id, key, map);
            }

            let id_strs: Vec<&String> = ids.iter().map(|StreamId { id, map: _ }| id).collect();
            let _: usize = con.xack(key, group_name, &id_strs).expect("ack err");
        }
    }
}

Upvotes: 1

Related Questions