Reputation: 2248
I am using the stream function of redis in actix-web 4, I want to create the consumer in the main function, this is my current code
[dependencies]
actix-web = "4"
tokio = { version = "1", features = ["full"] }
redis = { version = "0.21", features = [
# "cluster",
"tokio-comp",
"tokio-native-tls-comp",
] }
#[actix_web::main]
async fn main() -> std::io::Result<()> {
utils::init::init_envfile();
env_logger::init_from_env(env_logger::Env::new());
let redis_pool = utils::init::init_redis_pool();
let mysql_pool = utils::init::init_mysql_pool();
let redist_stream_consumer = web::block(redis_stream_group);
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(redis_pool.clone()))
.app_data(web::Data::new(mysql_pool.clone()))
.service(web::scope("/api").configure(controller::api::config))
})
.bind(("0.0.0.0", 7777))?
.run()
.await?;
redist_stream_consumer.await.unwrap();
Ok(())
}
fn redis_stream_group() {
let client = redis::Client::open("redis://127.0.0.1/").expect("client");
let mut con = client.get_connection().expect("con");
let key = "s.order";
let group_name = "g1";
let consumer_name = "c1";
let _: Result<(), _> = con.xgroup_create_mkstream(key, group_name, "$");
let opts = StreamReadOptions::default()
.group(group_name, consumer_name)
.count(1)
.block(0);
loop {
let read_reply: StreamReadReply =
con.xread_options(&[key], &[">"], &opts).expect("read err");
for StreamKey { key, ids } in read_reply.keys {
for StreamId { id, map } in &ids {
log::info!("id:{} | key:{} | data:{:?}", id, key, map);
}
let id_strs: Vec<&String> = ids.iter().map(|StreamId { id, map: _ }| id).collect();
let _: usize = con.xack(key, group_name, &id_strs).expect("ack err");
}
}
}
When I use cargo r
, I can run the program normally and get the sent messages, but when I execute ctrl+c
, I can't exit the program.
Also I'm not sure if using web::block
in the main
function is correct and if there is a better way to run child threads
UPDATE: Tried using tokio::spawn
, seems to work
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let redis_pool = utils::init::init_redis_pool();
let mysql_pool = utils::init::init_mysql_pool();
for consumer_index in 1..=2 {
let c_redis_pool = redis_pool.clone();
tokio::spawn(async move {
let mut con = c_redis_pool.get().await.unwrap();
let key = "s.order";
let group_name = "g1";
let consumer_name = &format!("c{consumer_index}");
let _: Result<(), _> = con.xgroup_create_mkstream(key, group_name, "$").await;
let opts = StreamReadOptions::default()
.group(group_name, consumer_name)
.count(1)
.block(5000);
loop {
let read_reply: StreamReadReply = con
.xread_options(&[key], &[">"], &opts)
.await
.expect("err");
for StreamKey { key, ids } in read_reply.keys {
for StreamId { id, map } in &ids {
log::info!(
"consumer: {} | id:{} | key:{} | data:{:?}",
consumer_name,
id,
key,
map
);
}
let id_strs: Vec<&String> =
ids.iter().map(|StreamId { id, map: _ }| id).collect();
let _: usize = con
.xack(key, group_name, &id_strs)
.await
.expect("ack err");
}
}
});
}
let serve = HttpServer::new(move || {
...
}
Upvotes: 0
Views: 1213
Reputation: 301
This can be done with the standard library by use
ing std::thread
and then creating the thread and whatever you want the other thread to do in a closure
fn main() {
thread::spawn(|| {
println!("doing things in the thread!");
});
println!("doing things outside the thread.... how boring");
}
if you want to pass data between them, you can use std::sync::mpsc
to transfer data between the threads safely and quickly, using let (item_one,item_two) = mpsc::channel();
, like so
fn main() {
let (sender,receiver) = mpsc::channel();
thread::spawn(move || {
let message = String::from("This message is from the thread");
sender.send(message).unwrap();
});
let letter = receiver.recv().unwrap();
note that the main thread proceeds as normal until it comes to the .recv()
, at which it either receives the data from the thread, or waits until the other thread is done.
in your example you could do something like
use std::sync::mpsc;
use std::thread;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
utils::init::init_envfile();
env_logger::init_from_env(env_logger::Env::new());
let port = get_env!("ACTIX_PORT", u16);
log::info!(
"starting HTTP server at http://{}:{}",
local_ipaddress::get().unwrap_or("localhost".to_string()),
port
);
let redis_pool = utils::init::init_redis_pool();
let mysql_pool = utils::init::init_mysql_pool();
let (consumer_sender,consumer_listener) = mpsc::channel();
thread::spawn(move || {
consumer_sender.send(redis_stream_group()).expect("You probably want to handle this case, but I'm too lazy");
});
let serve = HttpServer::new(move || {
let app_state = utils::init::AppState {
app_name: get_env!("APP_NAME", String),
pwd_secret: get_env!("PWD_SECRET", String),
jwt_secret: get_env!("JWT_SECRET", String),
jwt_exp: get_env!("JWT_EXP", i64),
};
App::new()
.app_data(web::Data::new(awc::Client::default()))
.app_data(web::Data::new(app_state))
.app_data(web::Data::new(redis_pool.clone()))
.app_data(web::Data::new(mysql_pool.clone()))
.wrap(actix_cors::Cors::default().allowed_origin_fn(|_, _| true))
.service(web::scope("/chat").configure(controller::chat::config))
.service(web::scope("/ws").configure(controller::ws::config))
.service(web::scope("/api").configure(controller::api::config))
});
if cfg!(debug_assertions) {
serve.bind(("0.0.0.0", port))?
} else {
let p = format!("/tmp/{}.socket", get_env!("APP_NAME", String));
let r = serve.bind_uds(&p)?;
let mut perms = std::fs::metadata(&p)?.permissions();
perms.set_readonly(false);
std::fs::set_permissions(&p, perms)?;
r
}
.run()
.await?;
let consumer = consumer_listener.recv().unwrap();
//then put things to do with the consumer here, or not idc
Ok(())
}
fn redis_stream_group() {
let client = redis::Client::open("redis://127.0.0.1/").expect("client");
let mut con = client.get_connection().expect("con");
let key = "s.order";
let group_name = "g1";
let consumer_name = "c1";
let _: Result<(), _> = con.xgroup_create_mkstream(key, group_name, "$");
let opts = StreamReadOptions::default()
.group(group_name, consumer_name)
.count(1)
.block(0);
loop {
let read_reply: StreamReadReply =
con.xread_options(&[key], &[">"], &opts).expect("read err");
for StreamKey { key, ids } in read_reply.keys {
for StreamId { id, map } in &ids {
log::info!("id:{} | key:{} | data:{:?}", id, key, map);
}
let id_strs: Vec<&String> = ids.iter().map(|StreamId { id, map: _ }| id).collect();
let _: usize = con.xack(key, group_name, &id_strs).expect("ack err");
}
}
}
Upvotes: 1