Reputation: 13
Context:
I am writing a web-server where we process different segments. I want to cache these different segments in different files (the segments can be upto 10MB of size). Something like this:
pub async fn segment_handler(segment: String) {
if is_cached(&segment) {
return get_from_cache(segment)
}
// Do some computation to get the result.
let result = do_some_large_computation(segment);
// Cache this result to a file.
let file_name = &format!("./cache/{}", &segment);
fs::create(file_name);
fs::write(file_name, result).expect("Unable to write file");
result
}
Now since segment_handler
can be called by multiple threads with different segment
, Is the fs::write
thread safe? If not, we cannot use mutex since the segment: String
can be different for each call and using mutex will make the performance worse. I need something like a mutex but on segment: String
only. What is the solution to this problem ?
Environment:
Upvotes: 1
Views: 2474
Reputation: 4249
The code you have posted does not compile because there's no such thing as fs::create
, however luckily you don't need it at all. The fs::write
function creates the file for you.
At least on Linux, calling fs::write
concurrently on the same path from several different threads will result in the file containing the contents passed to one of the fs::write
calls. Note that if you use the existence of the file to determine whether you need to read from the cache or recompute it, you may end up with multiple threads recomputing the same value, then all of them writing it to the file.
You should be aware that since you are using async/await, you are not allowed to use the std::fs
module since it blocks the thread. You should either use tokio::fs::write
like this:
pub async fn segment_handler(segment: String) {
if is_cached {
return get_from_cache(segment)
}
// Do some computation to get the result.
let result = do_some_large_computation(segment);
// Cache this result to a file.
let file_name = &format!("./cache/{}", &segment);
tokio::fs::write(file_name, result).await.expect("Unable to write file");
result
}
Another correct option is to use spawn_blocking
like this:
pub async fn segment_handler(segment: String) {
if is_cached {
return get_from_cache(segment)
}
tokio::task::spawn_blocking(move || {
// Do some computation to get the result.
let result = do_some_large_computation(segment);
// Cache this result to a file.
let file_name = &format!("./cache/{}", &segment);
tokio::fs::write(file_name, result).await.expect("Unable to write file");
result
}).await.unwrap("Panic in spawn_blocking")
}
You can read more about why you must properly handle blocking like this in CPU-bound tasks and blocking code from Tokio's documentation.
Tokio is able to concurrently run many tasks on a few threads by repeatedly swapping the currently running task on each thread. However, this kind of swapping can only happen at
.await
points, so code that spends a long time without reaching an .await will prevent other tasks from running. To combat this, Tokio provides two kinds of threads: Core threads and blocking threads. The core threads are where all asynchronous code runs, and Tokio will by default spawn one for each CPU core. The blocking threads are spawned on demand, and can be used to run blocking code that would otherwise block other tasks from running.To spawn a blocking task, you should use the
spawn_blocking
function.
Note that I have linked to Tokio 0.2's documentation as warp does not yet support Tokio 0.3.
To prevent having the value computed several times if the function is called several times before the first call finishes, you can use a technique based on a HashMap
stored behind a mutex like this:
use std::collections::HashMap;
use std::sync::Mutex;
use tokio::sync::broadcast;
pub struct Cache {
inner: Mutex<Inner>,
}
struct Inner {
cached: HashMap<String, CachedType>,
pending: HashMap<String, broadcast::Sender<CachedType>>,
}
pub enum TryCached {
Exists(CachedType),
Pending(broadcast::Receiver<CachedType>),
New(),
}
impl Cache {
pub fn try_get(&self, key: &str) -> TryCached {
let mut inner = self.inner.lock().unwrap();
if let Some(value) = inner.cached.get(key) {
// To avoid clone, use HashMap<String, Arc<CachedType>> and clone anyway.
TryCached::Exists(value.clone())
} else if let Some(pending) = inner.pending.get(key) {
TryCached::Pending(pending.subscribe())
} else {
let (channel, _) = broadcast::channel(1);
inner.pending.insert(key.to_string(), channel);
TryCached::New()
}
}
pub fn put_computed(&self, key: String, value: CachedType) {
let mut inner = self.inner.lock().unwrap();
if let Some(chan) = inner.pending.remove(&key) {
chan.send(value.clone());
}
inner.cached.insert(key, value);
}
}
The method can then be implemented as a call to try_get
that does different things depending on the value of the returned enum.
pub async fn segment_handler(cache: &Cache, segment: String) -> CachedType {
match cache.try_get(&segment) {
TryCached::Exists(value) => value,
TryCached::Pending(mut chan) => chan.recv().await.expect("Sender dropped without sending"),
TryCached::New() => {
let (segment, value) = tokio::task::spawn_blocking(move || {
// Do some computation to get the result.
let result = do_some_large_computation(&segment);
// Cache this result to a file.
let file_name = &format!("./cache/{}", &segment);
std::fs::write(file_name, result.to_slice()).expect("Unable to write file");
(segment, result)
})
.await
.expect("Panic in spawn_blocking");
cache.put_computed(segment, value.clone());
value
}
}
}
The full example can be found on the playground.
This method is fully thread-safe due to the mutex. Note that this uses a synchronous mutex rather than an async mutex. To read more about why this is okay, see the shared state chapter from the Tokio tutorial.
Upvotes: 6