WJM
WJM

Reputation: 1191

How do I gracefully exit TcpListener.incoming()?

From the rust std net library:

let listener = TcpListener::bind(("127.0.0.1", port)).unwrap();

info!("Opened socket on localhost port {}", port);

// accept connections and process them serially
for stream in listener.incoming() {
    break;
}

info!("closed socket");

How does one make the listener stop listening? It says in the API that when the listener is dropped, it stops. But how do we drop it if incoming() is a blocking call? Preferably without external crates like tokio/mio.

Upvotes: 16

Views: 6801

Answers (3)

euclio
euclio

Reputation: 1497

The standard library doesn't provide an API for this, but there are a few strategies you can use to work around it:

Shut down reads on the socket

You can use platform-specific APIs to shutdown reads on the socket which will cause the incoming iterator to return an error. You can then break out of handling connections when the error is received. For example, on a Unix system:

use std::net::TcpListener;
use std::os::unix::io::AsRawFd;
use std::thread;

let listener = TcpListener::bind("localhost:0")?;

let fd = listener.as_raw_fd();

let handle = thread::spawn(move || {
  for connection in listener.incoming() {
    match connection {
      Ok(connection) => { /* handle connection */ }
      Err(_) => break,
  }
});

libc::shutdown(fd, libc::SHUT_RD);

handle.join();

Force the listener to wake up

Another (cross-platform) trick is to set a variable indicating that you want to stop listening, and then connect to the socket yourself to force the listening thread to wake up. When the listening thread wakes up, it checks the "stop listening" variable, and then exits cleanly if it's set.

use std::net::{TcpListener, TcpStream};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;

let listener = TcpListener::bind("localhost:0")?;
let local_addr = listener.local_addr()?;

let shutdown = Arc::new(AtomicBool::new(false));
let server_shutdown = shutdown.clone();
let handle = thread::spawn(move || {
    for connection in listener.incoming() {
        if server_shutdown.load(Ordering::Relaxed) {
            return;
        }

        match connection {
            Ok(connection) => { /* handle connection */ }
            Err(_) => break,
        }
    }
});

shutdown.store(true, Ordering::Relaxed);
let _ = TcpStream::connect(local_addr);

handle.join().unwrap();

Upvotes: 15

Proton
Proton

Reputation: 664

You can poll your socket with an eventfd, which used for signaling. I wrote a helper for this.

let shutdown = EventFd::new();
let listener = TcpListener::bind("0.0.0.0:12345")?;
let incoming = CancellableIncoming::new(&listener, &shutdown);

for stream in incoming {
    // Your logic
}

// While in other thread
shutdown.add(1);  // Light the shutdown signal, now your incoming loop exits gracefully.
use nix;
use nix::poll::{poll, PollFd, PollFlags};
use nix::sys::eventfd::{eventfd, EfdFlags};
use nix::unistd::{close, write};
use std;
use std::net::{TcpListener, TcpStream};
use std::os::unix::io::{AsRawFd, RawFd};

pub struct EventFd {
    fd: RawFd,
}

impl EventFd {
    pub fn new() -> Self {
        EventFd {
            fd: eventfd(0, EfdFlags::empty()).unwrap(),
        }
    }

    pub fn add(&self, v: i64) -> nix::Result<usize> {
        let b = v.to_le_bytes();
        write(self.fd, &b)
    }
}

impl AsRawFd for EventFd {
    fn as_raw_fd(&self) -> RawFd {
        self.fd
    }
}

impl Drop for EventFd {
    fn drop(&mut self) {
        let _ = close(self.fd);
    }
}

// -----
//
pub struct CancellableIncoming<'a> {
    listener: &'a TcpListener,
    eventfd: &'a EventFd,
}

impl<'a> CancellableIncoming<'a> {
    pub fn new(listener: &'a TcpListener, eventfd: &'a EventFd) -> Self {
        Self { listener, eventfd }
    }
}

impl<'a> Iterator for CancellableIncoming<'a> {
    type Item = std::io::Result<TcpStream>;
    fn next(&mut self) -> Option<std::io::Result<TcpStream>> {
        use nix::errno::Errno;

        let fd = self.listener.as_raw_fd();
        let evfd = self.eventfd.as_raw_fd();
        let mut poll_fds = vec![
            PollFd::new(fd, PollFlags::POLLIN),
            PollFd::new(evfd, PollFlags::POLLIN),
        ];

        loop {
            match poll(&mut poll_fds, -1) {
                Ok(_) => break,
                Err(nix::Error::Sys(Errno::EINTR)) => continue,
                _ => panic!("Error polling"),
            }
        }

        if poll_fds[0].revents().unwrap() == PollFlags::POLLIN {
            Some(self.listener.accept().map(|p| p.0))
        } else if poll_fds[1].revents().unwrap() == PollFlags::POLLIN {
            None
        } else {
            panic!("Can't be!");
        }
    }
}

Upvotes: 5

effect
effect

Reputation: 1455

You'll want to put the TcpListener into non-blocking mode using the set_nonblocking() method, like so:

use std::io;
use std::net::TcpListener;

let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
listener.set_nonblocking(true).expect("Cannot set non-blocking");

for stream in listener.incoming() {
    match stream {
        Ok(s) => {
            // do something with the TcpStream
            handle_connection(s);
        }
        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
            // Decide if we should exit
            break;
            // Decide if we should try to accept a connection again
            continue;
        }
        Err(e) => panic!("encountered IO error: {}", e),
    }
}

Instead of waiting for a connection, the incoming() call will immediately return a Result<> type. If Result is Ok(), then a connection was made and you can process it. If the Result is Err(WouldBlock), this isn't actually an error, there just wasn't a connection pending at the exact moment incoming() checked the socket.

Note that in the WouldBlock case, you may want to put a sleep() or something before continuing, otherwise your program will rapidly poll the incoming() function checking for a connection, resulting in high CPU usage.

Code example adapted from here

Upvotes: 10

Related Questions