Reputation: 3545
I'm reading futures-preview
0.3 sources to find out how to do "notify any" correctly. In mpsc::channel
(which is bounded), multiple senders may wait for a receipt (in case of full buffer).
Looking into the implementation of next_message
and unpark_one
, the receiver seems to only notify one sender per one receipt.
I doubt this works in presense of select!
, because select!
may lead to false notification. However, I couldn't produce a problem case.
Here's my attempt to confuse mpsc
:
[package]
name = "futures-mpsc-test"
version = "0.1.0"
edition = "2018"
[dependencies]
futures-preview = { version = "0.3.0-alpha.9", features = ["tokio-compat"] }
tokio = "0.1.11"
and this:
#![feature(async_await, await_macro, futures_api, pin)]
use std::collections::HashSet;
use futures::prelude::*;
use futures::channel::mpsc::{channel, Sender};
use futures::channel::oneshot;
use futures::select;
async fn main2() {
let channel_len = 1;
let num_false_wait = 1000;
let num_expected_messages = 100;
let (mut send, mut recv) = channel(channel_len);
// One extra capacity per sender. Fill the extras.
await!(send.send(-2)).unwrap();
// Fill buffers
for _ in 0..channel_len {
await!(send.send(-1)).unwrap();
}
// False waits. Should resolve and produce false waiters.
for _ in 0..num_false_wait {
await!(false_wait(&send));
}
// True messages.
{
let mut send = send.clone();
await!(send.send(-2)).unwrap();
tokio::spawn(async move {
for i in 0..num_expected_messages {
await!(send.send(i)).unwrap();
}
Ok(())
}.boxed().compat());
}
// Drain receiver until all true messages are received.
let mut expects = (0..num_expected_messages).collect::<HashSet<_>>();
while !expects.is_empty() {
let i = await!(recv.next()).unwrap();
expects.remove(&i);
eprintln!("Received: {}", i);
}
}
// If `send` is full, it will produce false waits.
async fn false_wait(send: &Sender<i32>) {
let (wait_send, wait_recv) = oneshot::channel();
let mut send = send.clone();
await!(send.send(-2)).unwrap();
tokio::spawn(async move {
let mut sending = send.send(-3);
let mut fallback = future::ready(());
select! {
sending => {
sending.unwrap();
},
fallback => {
eprintln!("future::ready is selected");
},
};
wait_send.send(()).unwrap();
Ok(())
}.boxed().compat());
await!(wait_recv).unwrap();
}
fn main() {
tokio::run(async {
await!(main2());
Ok(())
}.boxed().compat());
}
I expect this to happen:
-1
. Therefore later senders are blocked.select!
immediately completes.await!(recv.next())
, at most one waiting sender is
notified. If a false waiter is notified, no one can push to the buffer,
even if the buffer has a vacant room.Despite my expectation, the main2
async function successfully completed. Why?
Upvotes: 1
Views: 1412
Reputation: 3545
Further investigation on the futures
source code solved my problem. At last, I cannot confuse the mpsc in this way.
The point is that, the size of mpsc
is flexible and can grow more than initially specified. This behavior is mentioned in the docs:
The channel's capacity is equal to
buffer + num-senders
. In other words, each sender gets a guaranteed slot in the channel capacity, and on top of that there are buffer "first come, first serve" slots available to all senders.
Yes, I've first read this before experimenting, but I couldn't figure out the importance of this at that time.
Think of a typical bounded queue implementation, where the size of a queue cannot grow more than initially specified. The spec is this:
In this situation, if the queue is full, multiple senders are waiting for a single resource (the size of the queue).
In multithread programming, this is accomplished by primitives like notify_one
. However, in futures
, this is fallible: unlike multithreaded programming, the notified task doesn't necessarily make use of the resource, because the task may already have given up acquiring the resource (due to constructions like select!
or Deadline
) Then the spec is simply broken (the queue isn't full, but all alive senders are blocked).
mpsc
is flexibleAs pointed out above, the buffer size for futures::channel::mpsc::channel
isn't strict. The spec is summarized as:
message_queue.len() == 0
, receivers block.message_queue.len() >= buffer
, senders may block.message_queue.len() >= buffer + num_senders
, senders block.Here, num_senders
is basically the number of clones of Sender
, but more than that in some cases. More precisely, num_senders
is the number of SenderTask
s.
So, how do we avoid resource sharing? We have additional states for that:
SenderTask
) has is_parked
boolean state.parked_queue
, a queue of Arc
references to SenderTask
.The channel maintains the following invariants:
message_queue.len() <= buffer + num_parked_senders
. Note that we don't know the value of num_parked_senders
.parked_queue.len() == min(0, message_queue.len() - buffer)
parked_queue
.This is accomplished by the following algorithm:
SenderTask
from parked_queue
and, if the sender is parked, unpark it.is_parked
to be false
. If message_queue.len() < buffer
, as parked_queue.len() == 0
, all senders are unparked. Therefore we can guarantee progress in this case.is_parked
is false
, push the message to the queue anyway.message_queue.len() <= buffer
, it needs to do nothing further.message_queue.len() > buffer
, the sender is made unparked and pushed to parked_queue
.You can easily check the invariant is maintained in the algorithm above.
Surprisingly, the senders no more wait for a shared resource. Instead, A sender waits for its is_parked
state. Even if the sending task is dropped before completion, it just remains in parked_queue
for a while and doesn't block anything. How clever it is!
Upvotes: 2
Reputation: 18943
I doubt this works in presense of select!, because select! may lead to false notification.
No, You can't "confuse" a mpsc
channel using select!
:
select!
does not trigger any mspc related notification, it just return the future that finishes first.
When the message queue is full it is await!(recv.next())
that notifies one producer that a slot into the bounded channel is now available.
In other words: there are no true waiters
and false waiters
:
when a channel message queue is full the producers block and await that the receiver side consumes the enqueued messages.
Upvotes: 1