Wren
Wren

Reputation: 69

Communicate between long and short running async streams w/o mutex?

I have two streams. slow_stream and fast_stream. I am trying to write fast_stream to s3 in a bucket named by the result of slow_stream.

Ideally, I'd do something like,

while let Some(slow) = slow_stream.next().await {
  while let Some(fast) = fast_stream.next().await {
    tokio::spawn(async_put_in_s3_bucket(fast,slow));
  }
}

If fast_stream runs indefinitely, does the control flow return to the outer loop here? Is this the correct way to handle this? Is it faster than using two tokio::spawn's and a mutex to communicate b/t them? Is there a more rust-y way to accomplish this? It seems like there is a way to use a codec to turn a fast stream into a ByteStream directly, but I'm still missing something about how to get it into s3 w/ the info from slow.

Upvotes: 1

Views: 128

Answers (1)

Zander
Zander

Reputation: 66

Have you tried using tokio::select! on the two streams, and let the outer loop handle the naming of the bucket? The purpose of your code is a bit unclear, but I can try to provide some psuedocode.

let mut bucket = Default::default();

loop {
  tokio::select! {
    slow = slow_stream.next() => {
      bucket = slow;
    }
    fast = fast_stream.next() => {
      // You can add this to an UnorderedFutures and await it in the select.
      tokio::spawn(async_put_in_s3_bucket(fast, bucket));
    }
  }
}

If you can add more clarity, that would allow me to propose a better solution.

Upvotes: 2

Related Questions