R Sun
R Sun

Reputation: 1669

Get result from both of `async` functions

Given sample weather API & it's corresponding cache wrapper (StreamCache), which needs to always return the most recent value the API has delivered.

My approach: (since subscribe needs to inform fetch to make it functional & since both of these futures will be polled simultaneously, in theory it should work)

let temperature_data = join!(api.subscribe(), api.fetch());

How do I retrieve result from both of these async functions ? I tried using join!, but somehow, it waits forever.

use async_trait::async_trait;
use futures::join;
use futures::stream::BoxStream;
use std::{collections::HashMap, result::Result, sync::{Arc, Mutex}};
type City = String;
type Temperature = u64;

#[async_trait]
pub trait Api: Send + Sync + 'static {
    async fn fetch(&self) -> Result<HashMap<City, Temperature>, String>;
    async fn subscribe(&self) -> BoxStream<Result<(City, Temperature), String>>;
}

pub struct StreamCache {
    results: Arc<Mutex<HashMap<String, u64>>>,
}

impl StreamCache {
    pub async fn new(api: impl Api) -> Self {
        let instance = Self {
            results: Arc::new(Mutex::new(HashMap::new())),
        };

        let temperature_data = join!(api.subscribe(), api.fetch());

        // instance.update_in_background(api);
        instance
    }

    pub fn get(&self, key: &str) -> Option<u64> {
        let results = self.results.lock().expect("poisoned");
        results.get(key).copied()
    }

    pub fn update_in_background(&self, api: impl Api) {
        // todo: perform action
    }
}

#[cfg(test)]
mod tests {
    use tokio::sync::Notify;

    use futures::{future, stream::select, FutureExt, StreamExt};
    use maplit::hashmap;

    use super::*;

    #[derive(Default)]
    struct TestApi {
        signal: Arc<Notify>,
    }

    #[async_trait]
    impl Api for TestApi {
        async fn fetch(&self) -> Result<HashMap<City, Temperature>, String> {
            // fetch is slow an may get delayed until after we receive the first updates
            self.signal.notified().await;
            Ok(hashmap! {
                "Berlin".to_string() => 29,
                "Paris".to_string() => 31,
            })
        }

        async fn subscribe(&self) -> BoxStream<Result<(City, Temperature), String>> {
            let results = vec![
                Ok(("London".to_string(), 27)),
                Ok(("Paris".to_string(), 32)),
            ];
            select(
                futures::stream::iter(results),
                async {
                    self.signal.notify_one();
                    future::pending().await
                }.into_stream(),
            ).boxed()
        }
    }

    #[tokio::test]
    async fn works() {
        let cache = StreamCache::new(TestApi::default()).await;

        // Allow cache to update
        // time::sleep(Duration::from_millis(100)).await;

        // assert_eq!(cache.get("Berlin"), Some(29));
        // assert_eq!(cache.get("London"), Some(27));
        // assert_eq!(cache.get("Paris"), Some(32));
    }
}

Upvotes: 0

Views: 84

Answers (2)

R Sun
R Sun

Reputation: 1669

I moved some parts to update_in_background()

use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::StreamExt;
use std::{
    collections::HashMap,
    sync::{Arc, Mutex},
};
type City = String;
type Temperature = u64;

#[async_trait]
pub trait Api: Send + Sync + 'static {
    async fn fetch(&self) -> Result<HashMap<City, Temperature>, String>;
    async fn subscribe(&self) -> BoxStream<Result<(City, Temperature), String>>;
}

#[derive(Debug, Clone)]
pub struct StreamCache {
    results: Arc<Mutex<HashMap<String, u64>>>,
}

impl StreamCache {
    pub async fn new(api: impl Api) -> Self {
        let instance = Self {
            results: Arc::new(Mutex::new(HashMap::new())),
        };

        let api = Arc::new(api);

        {
            let instance = instance.clone();
            let api = api.clone();
            tokio::spawn(async move {
                StreamCache::update_in_background(instance, api).await;
            });
        }

        // fetch a map
        // we take the subscribe data as the latest
        {
            let fetch_map = api.fetch().await.unwrap();
            let mut map = instance.results.lock().unwrap();
            for (k, v) in fetch_map {
                if map.contains_key(&k) {
                    continue;
                }
                map.insert(k, v);
            }
        }

        instance
    }

    pub async fn update_in_background(instance: Self, api: Arc<impl Api + Sized>) {
        let mut sub = api.subscribe().await;
        while let Some(Ok((k, v))) = sub.next().await {
            let mut map = instance.results.lock().unwrap();
            map.insert(k, v);
        }
    }

    pub fn get(&self, key: &str) -> Option<u64> {
        let results = self.results.lock().expect("poisoned");
        results.get(key).copied()
    }
}

#[cfg(test)]
mod tests {
    use std::time::Duration;
    use tokio::sync::Notify;

    use futures::{future, stream::select, FutureExt, StreamExt};
    use maplit::hashmap;

    use super::*;

    #[derive(Default)]
    struct TestApi {
        signal: Arc<Notify>,
    }

    #[async_trait]
    impl Api for TestApi {
        async fn fetch(&self) -> Result<HashMap<City, Temperature>, String> {
            // fetch is slow an may get delayed until after we receive the first updates
            self.signal.notified().await;
            Ok(hashmap! {
                "Berlin".to_string() => 29,
                "Paris".to_string() => 31,
            })
        }

        async fn subscribe(&self) -> BoxStream<Result<(City, Temperature), String>> {
            let results = vec![
                Ok(("London".to_string(), 27)),
                Ok(("Paris".to_string(), 32)),
            ];

            select(
                futures::stream::iter(results),
                async {
                    self.signal.notify_one();
                    future::pending().await
                }
                .into_stream(),
            )
            .boxed()
        }
    }

    #[tokio::test]
    async fn works() {
        let cache = StreamCache::new(TestApi::default()).await;

        // Allow cache to update
        tokio::time::sleep(Duration::from_millis(100)).await;

        assert_eq!(cache.get("Berlin"), Some(29));
        assert_eq!(cache.get("London"), Some(27));
        assert_eq!(cache.get("Paris"), Some(32));
    }
}

Upvotes: 0

啊鹿Dizzyi
啊鹿Dizzyi

Reputation: 1050

Does it make sense to change your subscribe() implementation to

async fn subscribe(&self) -> BoxStream<Result<(City, Temperature), String>> {
    let results = vec![
        Ok(("London".to_string(), 27)),
        Ok(("Paris".to_string(), 32)),
    ];
    self.signal.notify_one();
    futures::stream::iter(results).boxed()
}

Edit

To change StreamCache instead of Api

use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::StreamExt;
use std::{
    collections::HashMap,
    result::Result,
    sync::{Arc, Mutex},
};
type City = String;
type Temperature = u64;

#[async_trait]
pub trait Api: Send + Sync + 'static {
    async fn fetch(&self) -> Result<HashMap<City, Temperature>, String>;
    async fn subscribe(&self) -> BoxStream<Result<(City, Temperature), String>>;
}

#[derive(Debug, Clone)]
pub struct StreamCache {
    results: Arc<Mutex<HashMap<String, u64>>>,
}


impl StreamCache {
    pub async fn new<T: Api>(api: T) -> Self {
        let instance = Self {
            results: Arc::new(Mutex::new(HashMap::new())),
        };

        let api =  Arc::new(api);
        
        {
            let instance = instance.clone();
            let api = api.clone();
            // Spawn a new tokio thread to handle subscribed data
            tokio::spawn(async move {
                let mut sub = api.subscribe().await;
                while let Some(Ok((k, v))) = sub.next().await {
                    let mut map = instance.results.lock().unwrap();
                    map.insert(k, v);
                    println!("{:?}", map);
                }
            });
        }

        // fetch a map
        // *we take the subscribe data as the latest*
        {
            let fetch_map = api.fetch().await.unwrap();
            let mut map = instance.results.lock().unwrap();
            for (k, v) in fetch_map {
                if map.contains_key(&k) {
                    continue;
                }
                map.insert(k, v);
            }
            println!("{:?}", map)
        }

        instance
    }

    pub fn get(&self, key: &str) -> Option<u64> {
        let results = self.results.lock().expect("poisoned");
        results.get(key).copied()
    }

    pub fn update_in_background(self, api: impl Api) {
        // todo: perform action
    }
}

#[cfg(test)]
mod tests {
    use tokio::sync::Notify;

    use futures::{future, stream::select, FutureExt, StreamExt};
    use maplit::hashmap;

    use super::*;

    #[derive(Default)]
    struct TestApi {
        signal: Arc<Notify>,
    }

    #[async_trait]
    impl Api for TestApi {
        async fn fetch(&self) -> Result<HashMap<City, Temperature>, String> {
            // fetch is slow an may get delayed until after we receive the first updates
            self.signal.notified().await;
            Ok(hashmap! {
                "Berlin".to_string() => 29,
                "Paris".to_string() => 31,
            })
        }

        async fn subscribe(&self) -> BoxStream<Result<(City, Temperature), String>> {
            let results = vec![
                Ok(("London".to_string(), 27)),
                Ok(("Paris".to_string(), 32)),
            ];
            select(
                futures::stream::iter(results),
                async {
                    self.signal.notify_one();
                    future::pending().await
                }
                .into_stream(),
            )
            .boxed()
        }
    }

    #[tokio::test]
    async fn works() {
        let cache = StreamCache::new(TestApi::default()).await;

        // Allow cache to update
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;

        assert_eq!(cache.get("Berlin"), Some(29));
        assert_eq!(cache.get("London"), Some(27));
        assert_eq!(cache.get("Paris"), Some(32));
    }
}
fn main() {}

Output

PS ***> cargo test -- --nocapture
   Compiling *** v0.1.0 (***)

.
.
.

running 1 test
{"London": 27}
{"Paris": 32, "London": 27}
{"Paris": 32, "Berlin": 29, "London": 27}
test tests::works ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.10s
*/

Upvotes: 0

Related Questions