Masaki Hara
Masaki Hara

Reputation: 3545

Why `futures::channel::mpsc` can just notify one sender?

I'm reading futures-preview 0.3 sources to find out how to do "notify any" correctly. In mpsc::channel (which is bounded), multiple senders may wait for a receipt (in case of full buffer).

Looking into the implementation of next_message and unpark_one, the receiver seems to only notify one sender per one receipt.

I doubt this works in presense of select!, because select! may lead to false notification. However, I couldn't produce a problem case.

Here's my attempt to confuse mpsc:

[package]
name = "futures-mpsc-test"
version = "0.1.0"
edition = "2018"

[dependencies]
futures-preview = { version = "0.3.0-alpha.9", features = ["tokio-compat"] }
tokio = "0.1.11"

and this:

#![feature(async_await, await_macro, futures_api, pin)]

use std::collections::HashSet;

use futures::prelude::*;

use futures::channel::mpsc::{channel, Sender};
use futures::channel::oneshot;
use futures::select;

async fn main2() {
    let channel_len = 1;
    let num_false_wait = 1000;
    let num_expected_messages = 100;

    let (mut send, mut recv) = channel(channel_len);
    // One extra capacity per sender. Fill the extras.
    await!(send.send(-2)).unwrap();

    // Fill buffers
    for _ in 0..channel_len {
        await!(send.send(-1)).unwrap();
    }

    // False waits. Should resolve and produce false waiters.
    for _ in 0..num_false_wait {
        await!(false_wait(&send));
    }

    // True messages.
    {
        let mut send = send.clone();
        await!(send.send(-2)).unwrap();
        tokio::spawn(async move {
            for i in 0..num_expected_messages {
                await!(send.send(i)).unwrap();
            }
            Ok(())
        }.boxed().compat());
    }

    // Drain receiver until all true messages are received.
    let mut expects = (0..num_expected_messages).collect::<HashSet<_>>();
    while !expects.is_empty() {
        let i = await!(recv.next()).unwrap();
        expects.remove(&i);
        eprintln!("Received: {}", i);
    }
}

// If `send` is full, it will produce false waits.
async fn false_wait(send: &Sender<i32>) {
    let (wait_send, wait_recv) = oneshot::channel();
    let mut send = send.clone();
    await!(send.send(-2)).unwrap();
    tokio::spawn(async move {
        let mut sending = send.send(-3);
        let mut fallback = future::ready(());
        select! {
            sending => {
                sending.unwrap();
            },
            fallback => {
                eprintln!("future::ready is selected");
            },
        };
        wait_send.send(()).unwrap();
        Ok(())
    }.boxed().compat());
    await!(wait_recv).unwrap();
}

fn main() {
    tokio::run(async {
        await!(main2());
        Ok(())
    }.boxed().compat());
}

I expect this to happen:

  1. The buffer is filled by -1. Therefore later senders are blocked.
  2. There are both "true waiters" and "false waiters". False waiters already exited, because the other arm of select! immediately completes.
  3. In each call to await!(recv.next()), at most one waiting sender is notified. If a false waiter is notified, no one can push to the buffer, even if the buffer has a vacant room.
  4. If all elements are drained without true notification, the entire system is stuck.

Despite my expectation, the main2 async function successfully completed. Why?

Upvotes: 1

Views: 1412

Answers (2)

Masaki Hara
Masaki Hara

Reputation: 3545

Further investigation on the futures source code solved my problem. At last, I cannot confuse the mpsc in this way.

The point is that, the size of mpsc is flexible and can grow more than initially specified. This behavior is mentioned in the docs:

The channel's capacity is equal to buffer + num-senders. In other words, each sender gets a guaranteed slot in the channel capacity, and on top of that there are buffer "first come, first serve" slots available to all senders.

Yes, I've first read this before experimenting, but I couldn't figure out the importance of this at that time.

Problem with fixed buffer

Think of a typical bounded queue implementation, where the size of a queue cannot grow more than initially specified. The spec is this:

  • When the queue is empty, receivers block.
  • When the queue is full (that is, the size is hitting the bound), senders block.

In this situation, if the queue is full, multiple senders are waiting for a single resource (the size of the queue).

In multithread programming, this is accomplished by primitives like notify_one. However, in futures, this is fallible: unlike multithreaded programming, the notified task doesn't necessarily make use of the resource, because the task may already have given up acquiring the resource (due to constructions like select! or Deadline) Then the spec is simply broken (the queue isn't full, but all alive senders are blocked).

mpsc is flexible

As pointed out above, the buffer size for futures::channel::mpsc::channel isn't strict. The spec is summarized as:

  • When message_queue.len() == 0, receivers block.
  • When message_queue.len() >= buffer, senders may block.
  • When message_queue.len() >= buffer + num_senders, senders block.

Here, num_senders is basically the number of clones of Sender, but more than that in some cases. More precisely, num_senders is the number of SenderTasks.

So, how do we avoid resource sharing? We have additional states for that:

  • Each sender (an instance of SenderTask) has is_parked boolean state.
  • The channel has another queue called parked_queue, a queue of Arc references to SenderTask.

The channel maintains the following invariants:

  • message_queue.len() <= buffer + num_parked_senders. Note that we don't know the value of num_parked_senders.
  • parked_queue.len() == min(0, message_queue.len() - buffer)
  • Each parked sender has at least one message in parked_queue.

This is accomplished by the following algorithm:

  • For receiving,
    • it pops off a SenderTask from parked_queue and, if the sender is parked, unpark it.
  • For sending,
    • It always waits for is_parked to be false. If message_queue.len() < buffer, as parked_queue.len() == 0, all senders are unparked. Therefore we can guarantee progress in this case.
    • If is_parked is false, push the message to the queue anyway.
    • After that, if message_queue.len() <= buffer, it needs to do nothing further.
    • if message_queue.len() > buffer, the sender is made unparked and pushed to parked_queue.

You can easily check the invariant is maintained in the algorithm above.

Surprisingly, the senders no more wait for a shared resource. Instead, A sender waits for its is_parked state. Even if the sending task is dropped before completion, it just remains in parked_queue for a while and doesn't block anything. How clever it is!

Upvotes: 2

attdona
attdona

Reputation: 18943

I doubt this works in presense of select!, because select! may lead to false notification.

No, You can't "confuse" a mpsc channel using select!:

select! does not trigger any mspc related notification, it just return the future that finishes first.

When the message queue is full it is await!(recv.next()) that notifies one producer that a slot into the bounded channel is now available.

In other words: there are no true waiters and false waiters: when a channel message queue is full the producers block and await that the receiver side consumes the enqueued messages.

Upvotes: 1

Related Questions