Reputation: 69
I have two streams. slow_stream
and fast_stream
. I am trying to write fast_stream
to s3 in a bucket named by the result of slow_stream
.
Ideally, I'd do something like,
while let Some(slow) = slow_stream.next().await {
while let Some(fast) = fast_stream.next().await {
tokio::spawn(async_put_in_s3_bucket(fast,slow));
}
}
If fast_stream
runs indefinitely, does the control flow return to the outer loop here? Is this the correct way to handle this? Is it faster than using two tokio::spawn
's and a mutex to communicate b/t them? Is there a more rust-y way to accomplish this? It seems like there is a way to use a codec to turn a fast stream into a ByteStream directly, but I'm still missing something about how to get it into s3 w/ the info from slow.
Upvotes: 1
Views: 128
Reputation: 66
Have you tried using tokio::select!
on the two streams, and let the outer loop handle the naming of the bucket? The purpose of your code is a bit unclear, but I can try to provide some psuedocode.
let mut bucket = Default::default();
loop {
tokio::select! {
slow = slow_stream.next() => {
bucket = slow;
}
fast = fast_stream.next() => {
// You can add this to an UnorderedFutures and await it in the select.
tokio::spawn(async_put_in_s3_bucket(fast, bucket));
}
}
}
If you can add more clarity, that would allow me to propose a better solution.
Upvotes: 2