Prana
Prana

Reputation: 703

Why do threads containing a MPSC channel never join?

I've this example collected from the internet:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

// Transaction enum
enum Transaction {
    Widthdrawl(String, f64),
    Deposit(String, f64),
}

fn main() {
    //A banking send receive example...
    // set the number of customers
    let n_customers = 10;

    // Create a "customer" and a "banker"
    let (customers, banker) = mpsc::channel();

    let handles = (0..n_customers + 1)
        .into_iter()
        .map(|i| {
            // Create another "customer"
            let customer = customers.clone();

            // Create the customer thread
            let handle = thread::Builder::new()
                .name(format!("{}{}", "thread", i).into())
                .spawn(move || {
                    // Define Transaction
                    let trans_type = match i % 2 {
                        0 => Transaction::Deposit(
                            thread::current().name().unwrap().to_string(),
                            (i + 5) as f64 * 10.0,
                        ),
                        _ => Transaction::Widthdrawl(
                            thread::current().name().unwrap().to_string(),
                            (i + 10) as f64 * 5.0,
                        ),
                    };

                    // Send the Transaction
                    customer.send(trans_type).unwrap();
                });

            handle
        })
        .collect::<Vec<Result<thread::JoinHandle<_>, _>>>();

    // Wait for threads to finish
    for handle in handles {
        handle.unwrap().join().unwrap()
    }

    // Create a bank thread
    let bank = thread::spawn(move || {
        // Create a value
        let mut balance: f64 = 10000.0;
        println!("Initially, Bank value: {}", balance);

        // Perform the transactions in order
        //banker.recv_timeout(Duration::new(5, 0)); <-- TIMEOUT line...

        banker.into_iter().for_each(|i| {
            let mut customer_name: String = "None".to_string();
            match i {
                // Subtract for Widthdrawls
                Transaction::Widthdrawl(cust, amount) => {
                    customer_name = cust;
                    println!(
                        "Customer name {} doing withdrawal of amount {}",
                        customer_name, amount
                    );
                    balance = balance - amount;
                }
                // Add for deposits
                Transaction::Deposit(cust, amount) => {
                    customer_name = cust;
                    println!(
                        "Customer name  {} doing deposit of amount {}",
                        customer_name, amount
                    );
                    balance = balance + amount;
                }
            }

            println!("Customer is {}, Bank value: {}", customer_name, balance);
        });
    });

    // Let the bank finish
    bank.join().unwrap(); //THE THREAD DOES NOT END!!
}

The bank thread never joins, thus not ending the main.

If I remove the join and uncomment the timeout line above, the bank thread sometimes does not wait for the customer threads send (which, I think is ok).

//banker.recv_timeout(Duration::new(5, 0)); <-- TIMEOUT line...

What could be the reason for the bank thread not joining or what could be a better way to make it understand that no more customer messages will be coming? (as I think the timeout() may not be a reliable way in here).

Upvotes: 0

Views: 326

Answers (1)

Prana
Prana

Reputation: 703

I need to drop the tx channel after all producers are done and then the consumer stops after that:

// Wait for threads to finish
for handle in handles {
    handle.unwrap().join().unwrap()
}

drop(customers);

// Create a bank thread

Upvotes: 2

Related Questions