From ada8ea954e0891b9aab9fc8310ba9393aa606ad2 Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Wed, 13 Feb 2019 15:17:07 -0800 Subject: [PATCH] Use "internal" processes to write buffered output This introduces "internal processes" which are backed by a pthread instead of a normal process. Internal processes are reaped using the topic machinery, plugging in neatly alongside the sigchld topic; this means that process_mark_finished_children() can wait for internal and external processes simultaneously. Initially internal processes replace the forked process that fish uses to write out the output of blocks and functions. --- src/common.h | 6 +++ src/exec.cpp | 106 ++++++++++++++++++++++++++++++++------------ src/fish_tests.cpp | 2 +- src/io.cpp | 3 +- src/proc.cpp | 60 +++++++++++++++++-------- src/proc.h | 44 ++++++++++++++++-- src/topic_monitor.h | 5 ++- 7 files changed, 171 insertions(+), 55 deletions(-) diff --git a/src/common.h b/src/common.h index 1433ad151..b74d6d98d 100644 --- a/src/common.h +++ b/src/common.h @@ -307,6 +307,12 @@ void vec_append(std::vector &receiver, std::vector &&donator) { std::make_move_iterator(donator.end())); } +/// Move an object into a shared_ptr. +template +std::shared_ptr move_to_sharedptr(T &&v) { + return std::make_shared(std::move(v)); +} + /// Print a stack trace to stderr. void show_stackframe(const wchar_t msg_level, int frame_count = 100, int skip_levels = 0); diff --git a/src/exec.cpp b/src/exec.cpp index c9d662464..dfd7cf8ef 100644 --- a/src/exec.cpp +++ b/src/exec.cpp @@ -34,6 +34,7 @@ #include "fallback.h" // IWYU pragma: keep #include "function.h" #include "io.h" +#include "iothread.h" #include "parse_tree.h" #include "parser.h" #include "postfork.h" @@ -55,16 +56,6 @@ /// Base open mode to pass to calls to open. #define OPEN_MASK 0666 -/// Called in a forked child. -static void exec_write_and_exit(int fd, const char *buff, size_t count, int status) { - if (write_loop(fd, buff, count) == -1) { - debug(0, WRITE_ERROR); - wperror(L"write"); - exit_without_destructors(status); - } - exit_without_destructors(status); -} - void exec_close(int fd) { ASSERT_IS_MAIN_THREAD(); @@ -361,6 +352,80 @@ static void on_process_created(const std::shared_ptr &j, pid_t child_pid) } } +/// Construct an internal process for the process p. In the background, write the data \p outdata to +/// stdout, respecting the io chain \p ios. For example if target_fd is 1 (stdout), and there is a +/// dup2 3->1, then we need to write to fd 3. Then exit the internal process. +static bool run_internal_process(process_t *p, std::string outdata, io_chain_t ios) { + p->check_generations_before_launch(); + + // We want both the dup2s and the io_chain_ts to be kept alive by the background thread, because + // they may own an fd that we want to write to. Move them all to a shared_ptr. The strings as + // well (they may be long). + // Construct a little helper struct to make it simpler to move into our closure without copying. + struct write_fields_t { + int src_outfd{-1}; + std::string outdata{}; + + io_chain_t ios{}; + maybe_t dup2s{}; + std::shared_ptr internal_proc{}; + + int success_status{}; + + bool skip_out() const { return outdata.empty() || src_outfd < 0; } + }; + + auto f = std::make_shared(); + f->outdata = std::move(outdata); + + // Construct and assign the internal process to the real process. + p->internal_proc_ = std::make_shared(); + f->internal_proc = p->internal_proc_; + + // Resolve the IO chain. + // Note it's important we do this even if we have no out or err data, because we may have been + // asked to truncate a file (e.g. `echo -n '' > /tmp/truncateme.txt'). The open() in the dup2 + // list resolution will ensure this happens. + f->dup2s = dup2_list_t::resolve_chain(ios); + if (!f->dup2s) { + return false; + } + + // Figure out which source fds to write to. If they are closed (unlikely) we just exit + // successfully. + f->src_outfd = f->dup2s->fd_for_target_fd(STDOUT_FILENO); + + // If we have nothing to right we can elide the thread. + // TODO: support eliding output to /dev/null. + if (f->skip_out()) { + f->internal_proc->mark_exited(EXIT_SUCCESS); + return true; + } + + // Ensure that ios stays alive, it may own fds. + f->ios = ios; + + // If our process is a builtin, it will have already set its status value. Make sure we + // propagate that if our I/O succeeds and don't read it on a background thread. TODO: have + // builtin_run provide this directly, rather than setting it in the process. + f->success_status = p->status; + + iothread_perform([f]() { + int status = f->success_status; + if (!f->skip_out()) { + ssize_t ret = write_loop(f->src_outfd, f->outdata.data(), f->outdata.size()); + if (ret < 0) { + if (errno != EPIPE) { + wperror(L"write"); + } + if (!status) status = 1; + } + } + f->internal_proc->mark_exited(status); + }); + return true; +} + /// Call fork() as part of executing a process \p p in a job \j. Execute \p child_action in the /// context of the child. Returns true if fork succeeded, false if fork failed. static bool fork_child_for_process(const std::shared_ptr &job, process_t *p, @@ -784,24 +849,9 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr io_chain.remove(block_output_bufferfill); auto block_output_buffer = io_bufferfill_t::finish(std::move(block_output_bufferfill)); - // Resolve our IO chain to a sequence of dup2s. - auto dup2s = dup2_list_t::resolve_chain(io_chain); - if (!dup2s) { - return false; - } - - const std::string buffer_contents = block_output_buffer->buffer().newline_serialized(); - const char *buffer = buffer_contents.data(); - size_t count = buffer_contents.size(); - if (count > 0) { - // We don't have to drain threads here because our child process is simple. - const char *fork_reason = - p->type == INTERNAL_BLOCK_NODE ? "internal block io" : "internal function io"; - if (!fork_child_for_process(j, p, *dup2s, false, fork_reason, [&] { - exec_write_and_exit(STDOUT_FILENO, buffer, count, status); - })) { - return false; - } + std::string buffer_contents = block_output_buffer->buffer().newline_serialized(); + if (!buffer_contents.empty()) { + return run_internal_process(p, std::move(buffer_contents), io_chain); } else { if (p->is_last_in_job) { proc_set_last_status(j->get_flag(job_flag_t::NEGATE) ? (!status) : status); diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index 5f83e1c06..351e49a77 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -5106,7 +5106,7 @@ static void test_topic_monitor() { constexpr auto t = topic_t::sigchld; do_test(gens[t] == 0); do_test(monitor.generation_for_topic(t) == 0); - auto changed = monitor.check(&gens, {t}, false /* wait */); + auto changed = monitor.check(&gens, topic_set_t{t}, false /* wait */); do_test(changed.none()); do_test(gens[t] == 0); diff --git a/src/io.cpp b/src/io.cpp index 6f72c3eb0..67a9c03d5 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -13,6 +13,7 @@ #include "fallback.h" // IWYU pragma: keep #include "io.h" #include "iothread.h" +#include "redirection.h" #include "wutil.h" // IWYU pragma: keep io_data_t::~io_data_t() = default; @@ -122,7 +123,7 @@ void io_buffer_t::begin_background_fillthread(autoclose_fd_t fd) { // We want our background thread to own the fd but it's not easy to move into a std::function. // Use a shared_ptr. - auto fdref = std::make_shared(std::move(fd)); + auto fdref = move_to_sharedptr(std::move(fd)); // Our function to read until the receiver is closed. // It's OK to capture 'this' by value because 'this' owns the background thread and joins it diff --git a/src/proc.cpp b/src/proc.cpp index 667d76ace..a8ebc4793 100644 --- a/src/proc.cpp +++ b/src/proc.cpp @@ -247,6 +247,13 @@ bool job_t::signal(int signal) { return true; } +void internal_proc_t::mark_exited(int status) { + assert(!exited() && "Process is already exited"); + exited_.store(true, std::memory_order_relaxed); + status_.store(status, std::memory_order_release); + topic_monitor_t::principal().post(topic_t::internal_exit); +} + static void mark_job_complete(const job_t *j) { for (auto &p : j->processes) { p->completed = 1; @@ -374,48 +381,63 @@ static void process_mark_finished_children(bool block_ok) { // Get the exit and signal generations of all reapable processes. // The exit generation tells us if we have an exit; the signal generation allows for detecting // SIGHUP and SIGINT. + // Get the gen count of all reapable processes. + topic_set_t reaptopics{}; generation_list_t gens{}; gens.fill(invalid_generation); job_iterator_t jobs; while (auto *j = jobs.next()) { for (const auto &proc : j->processes) { - if (j->can_reap(proc.get())) { - gens[topic_t::sigchld] = - std::min(gens[topic_t::sigchld], proc->gens_[topic_t::sigchld]); + if (auto mtopic = j->reap_topic_for_process(proc.get())) { + topic_t topic = *mtopic; + reaptopics.set(topic); + gens[topic] = std::min(gens[topic], proc->gens_[topic]); + + reaptopics.set(topic_t::sighupint); gens[topic_t::sighupint] = std::min(gens[topic_t::sighupint], proc->gens_[topic_t::sighupint]); } } } - if (gens[topic_t::sigchld] == invalid_generation) { + if (reaptopics.none()) { // No reapable processes, nothing to wait for. return; } // Now check for changes, optionally waiting. - topic_set_t topics{{topic_t::sigchld, topic_t::sighupint}}; - auto changed_topics = topic_monitor_t::principal().check(&gens, topics, block_ok); + auto changed_topics = topic_monitor_t::principal().check(&gens, reaptopics, block_ok); if (changed_topics.none()) return; // We got some changes. Since we last checked we received SIGCHLD, and or HUP/INT. // Update the hup/int generations and reap any reapable processes. jobs.reset(); while (auto *j = jobs.next()) { - for (auto &proc : j->processes) { - // Update the signalhupint generation so we don't break on old sighupints. - proc->gens_[topic_t::sighupint] = gens[topic_t::sighupint]; + for (const auto &proc : j->processes) { + if (auto mtopic = j->reap_topic_for_process(proc.get())) { + // Update the signal hup/int gen. + proc->gens_[topic_t::sighupint] = gens[topic_t::sighupint]; - // Try reaping processes whose sigchld count is below what was returned. - if (changed_topics.get(topic_t::sigchld)) { - if (j->can_reap(proc.get()) && - proc->gens_[topic_t::sigchld] < gens[topic_t::sigchld]) { - proc->gens_[topic_t::sigchld] = gens[topic_t::sigchld]; - int status = 0; - auto pid = waitpid(proc->pid, &status, WNOHANG | WUNTRACED); - if (pid > 0) { - debug(4, "Reaped PID %d", pid); - handle_child_status(pid, status); + if (proc->gens_[*mtopic] < gens[*mtopic]) { + // Potentially reapable. Update its gen count and try reaping it. + proc->gens_[*mtopic] = gens[*mtopic]; + if (proc->internal_proc_) { + // Try reaping an internal process. + if (proc->internal_proc_->exited()) { + proc->status = proc->internal_proc_->get_status(); + proc->completed = true; + } + } else if (proc->pid > 0) { + // Try reaping an external process. + int status = -1; + auto pid = waitpid(proc->pid, &status, WNOHANG | WUNTRACED); + if (pid > 0) { + assert(pid == proc->pid && "Unexpcted waitpid() return"); + debug(4, "Reaped PID %d", pid); + handle_child_status(pid, status); + } + } else { + assert(0 && "Don't know how to reap this process"); } } } diff --git a/src/proc.h b/src/proc.h index 7379bd065..c588cf766 100644 --- a/src/proc.h +++ b/src/proc.h @@ -42,6 +42,28 @@ enum { JOB_CONTROL_NONE, }; +/// A structure representing a "process" internal to fish. This is backed by a pthread instead of a +/// separate process. +class internal_proc_t { + /// Whether the process has exited. + std::atomic exited_{}; + + /// If the process has exited, its status code. + std::atomic status_{}; + + public: + /// \return if this process has exited. + bool exited() const { return exited_.load(std::memory_order_relaxed); } + + /// Mark this process as exited, with the given status. + void mark_exited(int status); + + int get_status() const { + assert(exited() && "Process is not exited"); + return status_.load(std::memory_order_acquire); + } +}; + /// A structure representing a single fish process. Contains variables for tracking process state /// and the process argument list. Actually, a fish process can be either a regular external /// process, an internal builtin which may or may not spawn a fake IO process during execution, a @@ -126,6 +148,10 @@ class process_t { /// Process ID pid_t pid{0}; + + /// If we are an "internal process," that process. + std::shared_ptr internal_proc_{}; + /// File descriptor that pipe output should bind to. int pipe_write_fd{0}; /// True if process has completed. @@ -214,15 +240,25 @@ class job_t { /// process if it is the group leader and the job is not yet constructed, because then we might /// also reap the process group and then we cannot add new processes to the group. bool can_reap(const process_t *p) const { - if (p->pid <= 0) { + // Internal processes can always be reaped. + if (p->internal_proc_) { + return true; + } else if (p->pid <= 0) { // Can't reap without a pid. return false; - } - if (!is_constructed() && pgid > 0 && p->pid == pgid) { + } else if (!is_constructed() && pgid > 0 && p->pid == pgid) { // p is the the group leader in an under-construction job. return false; + } else { + return true; } - return true; + } + + /// \returns the reap topic for a process, which describes the manner in which we are reaped. A + /// none returns means don't reap, or perhaps defer reaping. + maybe_t reap_topic_for_process(const process_t *p) const { + if (p->completed || !can_reap(p)) return none(); + return p->internal_proc_ ? topic_t::internal_exit : topic_t::sigchld; } /// Returns a truncated version of the job string. Used when a message has already been emitted diff --git a/src/topic_monitor.h b/src/topic_monitor.h index f74c900ec..78498c4cc 100644 --- a/src/topic_monitor.h +++ b/src/topic_monitor.h @@ -35,8 +35,9 @@ /// The list of topics that may be observed. enum class topic_t : uint8_t { - sigchld, // Corresponds to SIGCHLD signal. - sighupint, // Corresponds to both SIGHUP and SIGINT signals. + sigchld, // Corresponds to SIGCHLD signal. + sighupint, // Corresponds to both SIGHUP and SIGINT signals. + internal_exit, // Corresponds to an internal process exit. COUNT };