Reputation: 1071
let full_results: Vec<String> = (0..num_threads)
.into_par_iter()
.flat_map(|thread_id| {
let mut local_results: Vec<String> = Vec::new();
rt.block_on(async {
let chunk_size = 1000;
let total_chunks = transactions.len() as u32 / chunk_size;
let chunks_per_thread = total_chunks / num_threads;
let extra_chunks = total_chunks % num_threads;
let start_index = thread_id * chunks_per_thread + std::cmp::min(thread_id, extra_chunks);
let end_index = (thread_id + 1) * chunks_per_thread + std::cmp::min(thread_id + 1, extra_chunks);
for chunk_index in start_index..end_index {
let chunk_start = (chunk_index * chunk_size) as usize;
let chunk_end = ((chunk_index + 1) * chunk_size) as usize;
let chunk_end = std::cmp::min(((chunk_index + 1) * chunk_size) as usize, transactions.len());
if let Some(chunk) = transactions.get(chunk_start..chunk_end) {
let owned_chunk: Vec<Value> = chunk.iter().cloned().collect();
let result = verify_transactions(owned_chunk, db.clone(), stop_flag.clone()).await;
for individual_string in result {
println!("{:?}", individual_string);
local_results.extend(individual_string);
}
}
};
});
local_results
}).collect();
verified_transactions.blocking_send(full_results);
});
Transactions is json containing 1 million objects. I need to process these as fast as possible. I have broken the json into 1000 chunks each. verify_transactions contains a single blocking element which checks to see if there were any errors at any point in the process to terminate early if errors occur, otherwise the code is non-blocking. It is imperative that the verify_transactions be async code as other functions need access to this code at the same time. The processing of transactions is straight forward, each chunk is broken into a per transaction and a series of test such as hashing, timestamp checking, and the likes take place.
For some reason, this code is taking over 16 hours to run and the CPU is mostly in a state of idle almost never rising above a 2% usage.
I am very new to rust and I just don't know how to get past this. I know 1 million objects is a lot, but still 16 hours with nearly no CPU usage with CPU nearly always idle is certainly not correct.
I need to optimize this in such a way it will use the majority of my CPUs but I cannot figure out how.
//verify transactions
async fn verify_transactions(parsed_data: Vec<Value>, db: Db, stop_flag: Arc<Mutex<bool>>) -> Result<Vec<String>, String> {
let mut results: Vec<String> = Vec::new();
let mut error_flag = false;
if parsed_data.len() == 1 {
let transaction = &parsed_data[0];
if *stop_flag.lock().await {
return Err("Stop signal received.".to_string());
}
// If there is only one transaction, process it directly
match verify_transaction(transaction.clone(), &db).await {
Ok(result) => {
// Push the actual result returned by the verify_transaction function
results.push(result);
}
Err(error) => {
// Handle the error, set the flag, and break out of the loop
println!("Error: {}", error);
*stop_flag.lock().await = true;
return Err("Error occurred in processing transactions.".to_string());
}
}
} else {
for transaction in &parsed_data {
if *stop_flag.lock().await {
return Err("Stop signal received.".to_string());
}
match verify_transaction(transaction.clone(), &db).await {
Ok(result) => {
println!("result: {}", result);
// Push the actual result returned by the verify_transaction function
results.push(result);
}
Err(error) => {
// Handle the error, set the flag, and break out of the loop
println!("Error: {}", error);
*stop_flag.lock().await = true;
return Err("Error occurred in processing transactions.".to_string());
}
}
}
}
Ok(results)
}
Upvotes: 0
Views: 44