tyhi
tyhi

Reputation: 23

Lifetime issue returning a stream with hyper/azure sdk

I've been trying to return a stream, as I've done with tokio::fs::File however I am getting a lifetime error on the BlobClient.

error[E0597]: `blob` does not live long enough
  --> src\main.rs:20:27
   |
20 |     let stream = Box::pin(blob.get().stream(128));
   |                           ^^^^^^^^^^
   |                           |
   |                           borrowed value does not live long enough
   |                           argument requires that `blob` is borrowed for `'static`
...
24 | }
   | - `blob` dropped here while still borrowed

I've tried a bunch of different ways of handling the stream but I can't navigate around this lifetime error. I'm sure it might be something simple that I just keep overlooking. Thanks for any assistance.

Here's a repo of what I'm trying to do:

use std::{convert::Infallible, net::SocketAddr};

use azure_core::new_http_client;
use azure_storage::{
    blob::prelude::{AsBlobClient, AsContainerClient},
    clients::{AsStorageClient, StorageAccountClient},
};
use futures::TryStreamExt;
use hyper::{
    service::{make_service_fn, service_fn},
    Body, Request, Response, Server,
};

async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
    let http_client = new_http_client();
    let storage_account_client = StorageAccountClient::new_access_key(http_client.clone(), "account", "key");
    let storage_client = storage_account_client.as_storage_client();
    let blob = storage_client.as_container_client("container").as_blob_client("blob");

    let stream = Box::pin(blob.get().stream(128));
    let s = stream.and_then(|f| futures::future::ok(f.data));

    Ok(Response::new(Body::wrap_stream(s)))
}

#[tokio::main]
async fn main() {
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    let make_service = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });

    let server = Server::bind(&addr).serve(make_service);

    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }
}

Upvotes: 2

Views: 100

Answers (1)

Brian Bowman
Brian Bowman

Reputation: 941

The problem is that the stream borrows from blob, but the wrap_stream() function only accepts 'static streams. A workaround is to construct the stream in a new task, and send back the stream items through a channel. The following helper function helps implement this approach:

/// Creates a `'static` stream from a closure returning a (possibly) non-`'static` stream.
///
/// The stream items, closure, and closure argument are still restricted to being `'static`,
/// but the closure can return a non-`'static` stream that borrows from the closure
/// argument.
fn make_static_stream<T, F, U>(
    make_stream: F,
    mut make_stream_arg: U,
) -> impl Stream<Item = T>
where
    T: Send + 'static,
    F: FnOnce(&mut U) -> BoxStream<'_, T> + Send + 'static,
    U: Send + 'static,
{
    let (mut tx, rx) = futures::channel::mpsc::channel(0);
    tokio::spawn(async move {
        let stream = make_stream(&mut make_stream_arg);
        pin_mut!(stream);
        while let Some(item) = stream.next().await {
            if tx.feed(item).await.is_err() {
                // Receiver dropped
                break;
            }
        }

        tx.close().await.ok();
    });

    rx
}

Here's how you would use it in the original code:

    // ...

    let stream = make_static_stream(
        |blob| blob.get().stream(128).map_ok(|x| x.data).boxed(),
        blob,
    );

    Ok(Response::new(Body::wrap_stream(stream)))
}

Upvotes: 2

Related Questions