From a765026c4c31c49efd0f7f2f4611c5fde6fc5d9d Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Tue, 4 Feb 2020 17:49:07 -0800 Subject: [PATCH] Adopt fd_monitor in bufferfill This switches bufferfills from using an exclusively-owned thread, to sharing an fd_monitor. This allows multiple bufferfills to all use the same thread. --- src/io.cpp | 165 +++++++++++++++++++++++++---------------------------- src/io.h | 9 +-- 2 files changed, 82 insertions(+), 92 deletions(-) diff --git a/src/io.cpp b/src/io.cpp index 22bf1dd7c..42fabfd46 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -15,6 +15,7 @@ #include "common.h" #include "exec.h" #include "fallback.h" // IWYU pragma: keep +#include "fd_monitor.h" #include "iothread.h" #include "path.h" #include "redirection.h" @@ -27,6 +28,13 @@ /// Base open mode to pass to calls to open. #define OPEN_MASK 0666 +/// Provide the fd monitor used for background fillthread operations. +static fd_monitor_t &fd_monitor() { + // Deliberately leaked to avoid shutdown dtors. + static fd_monitor_t *fdm = new fd_monitor_t(); + return *fdm; +} + io_data_t::~io_data_t() = default; io_pipe_t::~io_pipe_t() = default; io_fd_t::~io_fd_t() = default; @@ -58,94 +66,43 @@ void io_buffer_t::append_from_stream(const output_stream_t &stream) { buffer_.append_wide_buffer(input); } -void io_buffer_t::run_background_fillthread(autoclose_fd_t readfd) { - // Here we are running the background fillthread, executing in a background thread. - // Our plan is: - // 1. poll via select() until the fd is readable. - // 2. Acquire the append lock. - // 3. read until EAGAIN (would block), appending - // 4. release the lock - // The purpose of holding the lock around the read calls is to ensure that data from background - // processes isn't weirdly interspersed with data directly transferred (from a builtin to a - // buffer). +ssize_t io_buffer_t::read_once(int fd) { + assert(fd >= 0 && "Invalid fd"); + ASSERT_IS_LOCKED(append_lock_); + errno = 0; + char buff[4096 * 4]; - const int fd = readfd.fd(); - - // 100 msec poll rate. Note that in most cases, the write end of the pipe will be closed so - // select() will return; the polling is important only for weird cases like a background process - // launched in a command substitution. - const long poll_timeout_usec = 100000; - struct timeval tv = {}; - tv.tv_usec = poll_timeout_usec; - - bool shutdown = false; - while (!shutdown) { - bool readable = false; - - // Poll if our fd is readable. - // Do this even if the shutdown flag is set. It's important we wait for the fd at least - // once. For short-lived processes, it's possible for the process to execute, produce output - // (fits in the pipe buffer) and be reaped before we are even scheduled. So always wait at - // least once on the fd. Note that doesn't mean we will wait for the full poll duration; - // typically what will happen is our pipe will be widowed and so this will return quickly. - // It's only for weird cases (e.g. a background process launched inside a command - // substitution) that we'll wait out the entire poll time. - fd_set fds; - FD_ZERO(&fds); - FD_SET(fd, &fds); - int ret = select(fd + 1, &fds, nullptr, nullptr, &tv); - // select(2) is allowed to (and does) update `tv` to indicate how much time was left, so we - // need to restore the desired value each time. - tv.tv_usec = poll_timeout_usec; - readable = ret > 0; - if (ret < 0 && errno != EINTR) { - // Surprising error. - wperror(L"select"); - return; - } - - // Only check the shutdown flag if we timed out. - // It's important that if select() indicated we were readable, that we call select() again - // allowing it to time out. Note the typical case is that the fd will be closed, in which - // case select will return immediately. - if (!readable) { - shutdown = this->shutdown_fillthread_; - } - - if (readable || shutdown) { - // Now either our fd is readable, or we have set the shutdown flag. - // Either way acquire the lock and read until we reach EOF, or EAGAIN / EINTR. - scoped_lock locker(append_lock_); - ssize_t ret; - do { - errno = 0; - char buff[4096]; - ret = read(fd, buff, sizeof buff); - if (ret > 0) { - buffer_.append(&buff[0], &buff[ret]); - } else if (ret == 0) { - shutdown = true; - } else if (ret == -1 && errno == 0) { - // No specific error. We assume we just return, - // since that's what we do in read_blocked. - return; - } else if (errno != EINTR && errno != EAGAIN) { - wperror(L"read"); - return; - } - } while (ret > 0); - } + // We want to swallow EINTR only; in particular EAGAIN needs to be returned back to the caller. + ssize_t ret; + do { + ret = read(fd, buff, sizeof buff); + } while (ret < 0 && errno == EINTR); + if (ret < 0 && errno != EAGAIN) { + wperror(L"read"); + } else if (ret > 0) { + buffer_.append(&buff[0], &buff[ret]); } - assert(shutdown && "Should only exit loop if shutdown flag is set"); + return ret; } -void io_buffer_t::begin_background_fillthread(autoclose_fd_t fd) { +void io_buffer_t::begin_filling(autoclose_fd_t fd) { ASSERT_IS_MAIN_THREAD(); assert(!fillthread_running() && "Already have a fillthread"); - // We want our background thread to own the fd but it's not easy to move into a std::function. - // Use a shared_ptr. - auto fdref = move_to_sharedptr(std::move(fd)); + // We want to fill buffer_ by reading from fd. fd is the read end of a pipe; the write end is + // owned by another process, or something else writing in fish. + // Pass fd to an fd_monitor. It will add fd to its select() loop, and give us a callback when + // the fd is readable, or when our timeout is hit. The usual path is that we will get called + // back, read a bit from the fd, and append it to the buffer. Eventually the write end of the + // pipe will be closed - probably the other process exited - and fd will be widowed; read() will + // then return 0 and we will stop reading. + // In exotic circumstances the write end of the pipe will not be closed; this may happen in + // e.g.: + // cmd ( background & ; echo hi ) + // Here the background process will inherit the write end of the pipe and hold onto it forever. + // In this case, we will hit the timeout on waiting for more data and notice that the shutdown + // flag is set (this indicates that the command substitution is done); in this case we will read + // until we get EAGAIN and then give up. // Construct a promise that can go into our background thread. auto promise = std::make_shared>(); @@ -154,13 +111,45 @@ void io_buffer_t::begin_background_fillthread(autoclose_fd_t fd) { // Note this should only ever be called once. fillthread_waiter_ = promise->get_future(); + // 100 msec poll rate. Note that in most cases, the write end of the pipe will be closed so + // select() will return; the polling is important only for weird cases like a background process + // launched in a command substitution. + constexpr uint64_t usec_per_msec = 1000; + uint64_t poll_usec = 100 * usec_per_msec; + // Run our function to read until the receiver is closed. - // It's OK to capture 'this' by value because 'this' owns the background thread and waits for it - // before dtor. - iothread_perform_cantwait([this, promise, fdref]() { - this->run_background_fillthread(std::move(*fdref)); - promise->set_value(); - }); + // It's OK to capture 'this' by value because 'this' waits for the promise in its dtor. + fd_monitor_item_t item; + item.fd = std::move(fd); + item.timeout_usec = poll_usec; + item.callback = [this, promise](autoclose_fd_t &fd, bool timed_out) { + ASSERT_IS_BACKGROUND_THREAD(); + // Only check the shutdown flag if we timed out. + // It's important that if select() indicated we were readable, that we call select() again + // allowing it to time out. Note the typical case is that the fd will be closed, in which + // case select will return immediately. + bool done = false; + if (!timed_out) { + // select() reported us as readable; read a bit. + scoped_lock locker(append_lock_); + ssize_t ret = read_once(fd.fd()); + done = (ret == 0 || (ret < 0 && errno != EAGAIN)); + } else if (shutdown_fillthread_) { + // Here our caller asked us to shut down; read while we keep getting data. + // This will stop when the fd is closed or if we get EAGAIN. + scoped_lock locker(append_lock_); + ssize_t ret; + do { + ret = read_once(fd.fd()); + } while (ret > 0); + done = true; + } + if (done) { + fd.close(); + promise->set_value(); + } + }; + fd_monitor().add(std::move(item)); } void io_buffer_t::complete_background_fillthread() { @@ -191,7 +180,7 @@ shared_ptr io_bufferfill_t::create(const fd_set_t &conflicts, } // Our fillthread gets the read end of the pipe; out_pipe gets the write end. auto buffer = std::make_shared(buffer_limit); - buffer->begin_background_fillthread(std::move(pipes->read)); + buffer->begin_filling(std::move(pipes->read)); return std::make_shared(std::move(pipes->write), buffer); } diff --git a/src/io.h b/src/io.h index f88e88862..951271c14 100644 --- a/src/io.h +++ b/src/io.h @@ -308,11 +308,12 @@ class io_buffer_t { /// Lock for appending. std::mutex append_lock_{}; - /// Called in the background thread to run it. - void run_background_fillthread(autoclose_fd_t readfd); + /// Read a bit, filling the buffer. The append lock must be held. + /// \return positive on success, 0 if closed, -1 on error (in which case errno will be set). + ssize_t read_once(int fd); - /// Begin the background fillthread operation, reading from the given fd. - void begin_background_fillthread(autoclose_fd_t readfd); + /// Begin the fill operation, reading from the given fd in the background. + void begin_filling(autoclose_fd_t readfd); /// End the background fillthread operation. void complete_background_fillthread();