From 78ed659151b8fd33e9a269be6082a8b28738962a Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Sat, 16 Feb 2019 14:19:59 -0800 Subject: [PATCH 01/10] Fancify enum_set and introduce enum_iter_t Allow iterating over the values of an enum class. --- src/enum_set.h | 87 ++++++++++++++++++++++++++++++++++++++-------- src/fish_tests.cpp | 33 ++++++++++++++++++ src/proc.h | 7 ++++ 3 files changed, 113 insertions(+), 14 deletions(-) diff --git a/src/enum_set.h b/src/enum_set.h index 6ef6e2c8c..c26f56306 100644 --- a/src/enum_set.h +++ b/src/enum_set.h @@ -1,26 +1,85 @@ #pragma once #include +#include +#include + +/// A type (to specialize) that provides a count for an enum. +/// Example: +/// template<> struct enum_info_t +/// { static constexpr auto count = MyEnum::COUNT; }; +template +struct enum_info_t {}; template -class enum_set_t { +class enum_set_t : private std::bitset(enum_info_t::count)> { private: - using base_type_t = typename std::underlying_type::type; - std::bitset<8 * sizeof(base_type_t)> bitmask{0}; - static int index_of(T t) { return static_cast(t); } + using super = std::bitset(enum_info_t::count)>; + static size_t index_of(T t) { return static_cast(t); } + + explicit enum_set_t(unsigned long raw) : super(raw) {} public: - bool get(T t) const { return bitmask.test(index_of(t)); } + enum_set_t() = default; - void set(T t, bool v) { - if (v) { - bitmask.set(index_of(t)); - } else { - bitmask.reset(index_of(t)); - } - } + explicit enum_set_t(T v) { set(v); } - void set(T t) { bitmask.set(index_of(t)); } + static enum_set_t from_raw(unsigned long v) { return enum_set_t{v}; } - void clear(T t) { bitmask.reset(index_of(t)); } + unsigned long to_raw() const { return super::to_ulong(); } + + bool get(T t) const { return super::test(index_of(t)); } + + void set(T t, bool v = true) { super::set(index_of(t), v); } + + void clear(T t) { super::reset(index_of(t)); } + + bool none() const { return super::none(); } + + bool any() const { return super::any(); } + + bool operator==(const enum_set_t &rhs) const { return super::operator==(rhs); } + + bool operator!=(const enum_set_t &rhs) const { return super::operator!=(rhs); } +}; + +/// A counting iterator for an enum class. +/// This enumerates the values of an enum class from 0 up to (not including) count. +/// Example: +/// for (auto v : enum_iter_t) {...} +template +class enum_iter_t { + using base_type_t = typename std::underlying_type::type; + struct iterator_t { + friend class enum_iter_t; + using iterator_category = std::forward_iterator_tag; + using value_type = T; + using difference_type = long; + + explicit iterator_t(base_type_t v) : v_(v) {} + + T operator*() const { return static_cast(v_); } + const T *operator->() const { return static_cast(v_); } + + iterator_t &operator++() { + v_ += 1; + return *this; + } + + iterator_t operator++(int) { + auto res = *this; + v_ += 1; + return res; + } + + bool operator==(iterator_t rhs) const { return v_ == rhs.v_; } + bool operator!=(iterator_t rhs) const { return v_ != rhs.v_; } + + private: + base_type_t v_{}; + }; + + public: + iterator_t begin() const { return iterator_t{0}; } + iterator_t end() const { return iterator_t{static_cast(enum_info_t::count)}; } }; diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index 7b1f5c6c3..f3c49128c 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -286,6 +286,38 @@ static void test_str_to_num() { L"converting invalid num to long did not fail"); } +enum class test_enum { alpha, beta, gamma, COUNT }; + +template <> +struct enum_info_t { + static constexpr auto count = test_enum::COUNT; +}; + +static void test_enum_set() { + say(L"Testing enum set"); + enum_set_t es; + do_test(es.none()); + do_test(!es.any()); + do_test(es.to_raw() == 0); + do_test(es == enum_set_t::from_raw(0)); + do_test(es != enum_set_t::from_raw(1)); + + es.set(test_enum::beta); + do_test(es.to_raw() == 2); + do_test(es == enum_set_t::from_raw(2)); + do_test(es == enum_set_t{test_enum::beta}); + do_test(es != enum_set_t::from_raw(3)); + do_test(es.any()); + do_test(!es.none()); + + unsigned idx = 0; + for (auto v : enum_iter_t{}) { + do_test(static_cast(v) == idx); + idx++; + } + do_test(static_cast(test_enum::COUNT) == idx); +} + /// Test sane escapes. static void test_unescape_sane() { const struct test_t { @@ -5082,6 +5114,7 @@ int main(int argc, char **argv) { if (should_test_function("wcstring_tok")) test_wcstring_tok(); if (should_test_function("env_vars")) test_env_vars(); if (should_test_function("str_to_num")) test_str_to_num(); + if (should_test_function("enum")) test_enum_set(); if (should_test_function("highlighting")) test_highlighting(); if (should_test_function("new_parser_ll2")) test_new_parser_ll2(); if (should_test_function("new_parser_fuzzing")) diff --git a/src/proc.h b/src/proc.h index 6950a7031..03472029d 100644 --- a/src/proc.h +++ b/src/proc.h @@ -153,6 +153,13 @@ enum class job_flag_t { JOB_CONTROL, /// Whether the job wants to own the terminal when in the foreground. TERMINAL, + + JOB_FLAG_COUNT +}; + +template <> +struct enum_info_t { + static constexpr auto count = job_flag_t::JOB_FLAG_COUNT; }; typedef int job_id_t; From ccc45235b0774d4434a578ce3c116401a70cd16b Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Mon, 11 Feb 2019 20:23:15 -0800 Subject: [PATCH 02/10] Introduce enum_array_t Allows for indexing an array via an enum class. --- src/enum_set.h | 28 +++++++++++++++++++++++++--- src/fish_tests.cpp | 11 +++++++++++ 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/src/enum_set.h b/src/enum_set.h index c26f56306..ea6e90b40 100644 --- a/src/enum_set.h +++ b/src/enum_set.h @@ -11,10 +11,17 @@ template struct enum_info_t {}; +/// \return the count of an enum. template -class enum_set_t : private std::bitset(enum_info_t::count)> { +constexpr size_t enum_count() { + return static_cast(enum_info_t::count); +} + +/// A bit set indexed by an enum type. +template +class enum_set_t : private std::bitset()> { private: - using super = std::bitset(enum_info_t::count)>; + using super = std::bitset()>; static size_t index_of(T t) { return static_cast(t); } explicit enum_set_t(unsigned long raw) : super(raw) {} @@ -43,6 +50,21 @@ class enum_set_t : private std::bitset(enum_info_t::count bool operator!=(const enum_set_t &rhs) const { return super::operator!=(rhs); } }; +/// An array of Elem indexed by an enum class. +template +class enum_array_t : public std::array()> { + using super = std::array()>; + using base_type_t = typename std::underlying_type::type; + + static int index_of(T t) { return static_cast(t); } + + public: + Elem &at(T t) { return super::at(index_of(t)); } + const Elem &at(T t) const { return super::at(index_of(t)); } + Elem &operator[](T t) { return super::operator[](index_of(t)); } + const Elem &operator[](T t) const { return super::operator[](index_of(t)); } +}; + /// A counting iterator for an enum class. /// This enumerates the values of an enum class from 0 up to (not including) count. /// Example: @@ -81,5 +103,5 @@ class enum_iter_t { public: iterator_t begin() const { return iterator_t{0}; } - iterator_t end() const { return iterator_t{static_cast(enum_info_t::count)}; } + iterator_t end() const { return iterator_t{static_cast(enum_count())}; } }; diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index f3c49128c..7b02115c0 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -318,6 +318,16 @@ static void test_enum_set() { do_test(static_cast(test_enum::COUNT) == idx); } +static void test_enum_array() { + say(L"Testing enum array"); + enum_array_t es{}; + do_test(es.size() == enum_count()); + es[test_enum::beta] = "abc"; + do_test(es[test_enum::beta] == "abc"); + es.at(test_enum::gamma) = "def"; + do_test(es.at(test_enum::gamma) == "def"); +} + /// Test sane escapes. static void test_unescape_sane() { const struct test_t { @@ -5115,6 +5125,7 @@ int main(int argc, char **argv) { if (should_test_function("env_vars")) test_env_vars(); if (should_test_function("str_to_num")) test_str_to_num(); if (should_test_function("enum")) test_enum_set(); + if (should_test_function("enum")) test_enum_array(); if (should_test_function("highlighting")) test_highlighting(); if (should_test_function("new_parser_ll2")) test_new_parser_ll2(); if (should_test_function("new_parser_fuzzing")) From fc9d2386423e418f9df054e98b9a5348c5cb2f10 Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Sat, 2 Feb 2019 15:39:04 -0800 Subject: [PATCH 03/10] Introduce topic monitoring topic_monitor allows for querying changes posted to one or more topics, initially sigchld. This will eventually replace the waitpid logic in process_mark_finished_children(). Comment from the new header: Topic monitoring support. Topics are conceptually "a thing that can happen." For example, delivery of a SIGINT, a child process exits, etc. It is possible to post to a topic, which means that that thing happened. Associated with each topic is a current generation, which is a 64 bit value. When you query a topic, you get back a generation. If on the next query the generation has increased, then it indicates someone posted to the topic. For example, if you are monitoring a child process, you can query the sigchld topic. If it has increased since your last query, it is possible that your child process has exited. Topic postings may be coalesced. That is there may be two posts to a given topic, yet the generation only increases by 1. The only guarantee is that after a topic post, the current generation value is larger than any value previously queried. Tying this all together is the topic_monitor_t. This provides the current topic generations, and also provides the ability to perform a blocking wait for any topic to change in a particular topic set. This is the real power of topics: you can wait for a sigchld signal OR a thread exit. --- CMakeLists.txt | 2 +- Makefile.in | 2 +- src/enum_set.h | 2 +- src/fish_tests.cpp | 64 ++++++++++++++++ src/iothread.h | 1 + src/signal.cpp | 2 + src/topic_monitor.cpp | 114 +++++++++++++++++++++++++++++ src/topic_monitor.h | 165 ++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 349 insertions(+), 3 deletions(-) create mode 100644 src/topic_monitor.cpp create mode 100644 src/topic_monitor.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 32dee2061..eb7949f7e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -80,7 +80,7 @@ SET(FISH_SRCS src/postfork.cpp src/proc.cpp src/reader.cpp src/sanity.cpp src/screen.cpp src/signal.cpp src/tinyexpr.cpp src/tnode.cpp src/tokenizer.cpp src/utf8.cpp src/util.cpp src/wcstringutil.cpp src/wgetopt.cpp src/wildcard.cpp src/wutil.cpp - src/future_feature_flags.cpp src/redirection.cpp + src/future_feature_flags.cpp src/redirection.cpp src/topic_monitor.cpp ) # Header files are just globbed. diff --git a/Makefile.in b/Makefile.in index eba0b41be..beb3b9253 100644 --- a/Makefile.in +++ b/Makefile.in @@ -119,7 +119,7 @@ FISH_OBJS := obj/autoload.o obj/builtin.o obj/builtin_bg.o obj/builtin_bind.o ob obj/parser_keywords.o obj/path.o obj/postfork.o obj/proc.o obj/reader.o \ obj/sanity.o obj/screen.o obj/signal.o obj/tinyexpr.o obj/tokenizer.o obj/tnode.o obj/utf8.o \ obj/util.o obj/wcstringutil.o obj/wgetopt.o obj/wildcard.o obj/wutil.o \ - obj/future_feature_flags.o obj/redirection.o + obj/future_feature_flags.o obj/redirection.o obj/topic_monitor.o FISH_INDENT_OBJS := obj/fish_indent.o obj/print_help.o $(FISH_OBJS) diff --git a/src/enum_set.h b/src/enum_set.h index ea6e90b40..22eb8c66d 100644 --- a/src/enum_set.h +++ b/src/enum_set.h @@ -29,7 +29,7 @@ class enum_set_t : private std::bitset()> { public: enum_set_t() = default; - explicit enum_set_t(T v) { set(v); } + /*implicit*/ enum_set_t(T v) { set(v); } static enum_set_t from_raw(unsigned long v) { return enum_set_t{v}; } diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index 7b02115c0..dc2522784 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -69,6 +70,7 @@ #include "signal.h" #include "tnode.h" #include "tokenizer.h" +#include "topic_monitor.h" #include "utf8.h" #include "util.h" #include "wcstringutil.h" @@ -5073,6 +5075,66 @@ void test_normalize_path() { do_test(path_normalize_for_cd(L"/abc/def/", L"../ghi/..") == L"/abc/ghi/.."); } +static void test_topic_monitor() { + say(L"Testing topic monitor"); + topic_monitor_t monitor; + generation_list_t gens{}; + constexpr auto t = topic_t::sigchld; + do_test(gens[t] == 0); + do_test(monitor.generation_for_topic(t) == 0); + auto changed = monitor.check(&gens, {t}, false /* wait */); + do_test(changed.none()); + do_test(gens[t] == 0); + + monitor.post(t); + changed = monitor.check(&gens, topic_set_t{t}, true /* wait */); + do_test(changed == topic_set_t{t}); + do_test(gens[t] == 1); + do_test(monitor.generation_for_topic(t) == 1); + + monitor.post(t); + do_test(monitor.generation_for_topic(t) == 2); + changed = monitor.check(&gens, topic_set_t{t}, true /* wait */); + do_test(changed == topic_set_t{t}); +} + +static void test_topic_monitor_torture() { + say(L"Torture-testing topic monitor"); + topic_monitor_t monitor; + const size_t thread_count = 64; + constexpr auto t = topic_t::sigchld; + std::vector gens; + gens.resize(thread_count, generation_list_t{}); + std::atomic post_count{}; + for (auto &gen : gens) { + gen = monitor.current_generations(); + post_count += 1; + monitor.post(t); + } + + std::atomic completed{}; + std::vector threads; + for (size_t i = 0; i < thread_count; i++) { + threads.emplace_back( + [&](size_t i) { + for (size_t j = 0; j < (1 << 11); j++) { + auto before = gens[i]; + auto changed = monitor.check(&gens[i], topic_set_t{t}, true /* wait */); + do_test(before[t] < gens[i][t]); + do_test(gens[i][t] <= post_count); + } + auto amt = completed.fetch_add(1, std::memory_order_relaxed); + }, + i); + } + + while (completed.load(std::memory_order_relaxed) < thread_count) { + post_count += 1; + monitor.post(t); + } + for (auto &t : threads) t.join(); +} + /// Main test. int main(int argc, char **argv) { UNUSED(argc); @@ -5192,6 +5254,8 @@ int main(int argc, char **argv) { if (should_test_function("maybe")) test_maybe(); if (should_test_function("layout_cache")) test_layout_cache(); if (should_test_function("normalize")) test_normalize_path(); + if (should_test_function("topics")) test_topic_monitor(); + if (should_test_function("topics")) test_topic_monitor_torture(); // history_tests_t::test_history_speed(); say(L"Encountered %d errors in low-level tests", err_count); diff --git a/src/iothread.h b/src/iothread.h index 5e677b639..39d99e539 100644 --- a/src/iothread.h +++ b/src/iothread.h @@ -2,6 +2,7 @@ #ifndef FISH_IOTHREAD_H #define FISH_IOTHREAD_H +#include #include #include diff --git a/src/signal.cpp b/src/signal.cpp index f1ab10581..8ca202330 100644 --- a/src/signal.cpp +++ b/src/signal.cpp @@ -15,6 +15,7 @@ #include "parser.h" #include "proc.h" #include "reader.h" +#include "topic_monitor.h" #include "wutil.h" // IWYU pragma: keep /// Struct describing an entry for the lookup table used to convert between signal names and signal @@ -262,6 +263,7 @@ static void handle_chld(int sig, siginfo_t *info, void *context) { if (reraise_if_forked_child(sig)) return; job_handle_signal(sig, info, context); default_handler(sig, info, context); + topic_monitor_t::principal().post(topic_t::sigchld); } // We have a sigalarm handler that does nothing. This is used in the signal torture test, to verify diff --git a/src/topic_monitor.cpp b/src/topic_monitor.cpp new file mode 100644 index 000000000..2c28a7517 --- /dev/null +++ b/src/topic_monitor.cpp @@ -0,0 +1,114 @@ +#include "config.h" // IWYU pragma: keep + +#include "limits.h" +#include "topic_monitor.h" +#include "wutil.h" + +#include + +/// Implementation of the principal monitor. This uses new (and leaks) to avoid registering a +/// pointless at-exit handler for the dtor. +static topic_monitor_t *const s_principal = new topic_monitor_t(); + +topic_monitor_t &topic_monitor_t::principal() { + // Do not attempt to move s_principal to a function-level static, it needs to be accessed from a + // signal handler so it must not be lazily created. + return *s_principal; +} + +topic_monitor_t::topic_monitor_t() { + // Set up our pipes. Assert it succeeds. + auto pipes = make_autoclose_pipes({}); + assert(pipes.has_value() && "Failed to make pubsub pipes"); + pipes_ = pipes.acquire(); + + // Make sure that our write side doesn't block, else we risk hanging in a signal handler. + // The read end must block to avoid spinning in await. + DIE_ON_FAILURE(make_fd_nonblocking(pipes_.write.fd())); +} + +topic_monitor_t::~topic_monitor_t() = default; + +void topic_monitor_t::post(topic_t topic) { + // Beware, we may be in a signal handler! + // Atomically update the pending topics. + auto rawtopics = topic_set_t{topic}.to_raw(); + auto oldtopics = pending_updates_.fetch_or(rawtopics, std::memory_order_relaxed); + if ((oldtopics & rawtopics) == rawtopics) { + // No new bits were set. + return; + } + + // Ok, we changed one or more bits. Ensure the topic change is visible, and announce the change + // by writing a byte to the pipe. + std::atomic_thread_fence(std::memory_order_release); + ssize_t ret; + do { + // write() is async signal safe. + const uint8_t v = 0; + ret = write(pipes_.write.fd(), &v, sizeof v); + } while (ret < 0 && errno == EINTR); + // Ignore EAGAIN and other errors (which conceivably could occur during shutdown). +} + +void topic_monitor_t::await_metagen(generation_t mgen) { + // Fast check of the metagen before taking the lock. If it's changed we're done. + if (mgen != current_metagen()) return; + + // Take the lock (which may take a long time) and then check again. + std::unique_lock locker{wait_queue_lock_}; + if (mgen != current_metagen()) return; + + // Our metagen hasn't changed. Push our metagen onto the queue, then wait until we're the + // lowest. If multiple waiters are the lowest, then anyone can be the observer. + // Note the reason for picking the lowest metagen is to avoid a priority inversion where a lower + // metagen (therefore someone who should see changes) is blocked waiting for a higher metagen + // (who has already seen the changes). + wait_queue_.push(mgen); + while (wait_queue_.top() != mgen) { + wait_queue_notifier_.wait(locker); + } + wait_queue_.pop(); + + // We now have the lowest metagen in the wait queue. Notice we still hold the lock. + // Read until the metagen changes. It may already have changed. + // Note because changes are coalesced, we can read a lot, potentially draining the pipe. + while (mgen == current_metagen()) { + uint8_t ignored[PIPE_BUF]; + (void)read(pipes_.read.fd(), ignored, sizeof ignored); + } + + // Release the lock and wake up the remaining waiters. + locker.unlock(); + wait_queue_notifier_.notify_all(); +} + +topic_set_t topic_monitor_t::check(generation_list_t *gens, topic_set_t topics, bool wait) { + if (topics.none()) return topics; + + topic_set_t changed{}; + for (;;) { + // Load the topic list and see if anything has changed. + generation_list_t current = updated_gens(); + for (topic_t topic : topic_iter_t{}) { + if (topics.get(topic)) { + assert(gens->at(topic) <= current.at(topic) && + "Incoming gen count exceeded published count"); + if (gens->at(topic) < current.at(topic)) { + gens->at(topic) = current.at(topic); + changed.set(topic); + } + } + } + + // If we're not waiting, or something changed, then we're done. + if (!wait || changed.any()) { + break; + } + + // Try again. Note that we use the metagen corresponding to the topic list we just + // inspected, not the current one (which may have updates since we checked). + await_metagen(metagen_for(current)); + } + return changed; +} diff --git a/src/topic_monitor.h b/src/topic_monitor.h new file mode 100644 index 000000000..fb61f06e5 --- /dev/null +++ b/src/topic_monitor.h @@ -0,0 +1,165 @@ +#ifndef FISH_TOPIC_MONITOR_H +#define FISH_TOPIC_MONITOR_H + +#include "common.h" +#include "enum_set.h" +#include "io.h" + +#include +#include +#include +#include +#include +#include +#include + +/** Topic monitoring support. Topics are conceptually "a thing that can happen." For example, + delivery of a SIGINT, a child process exits, etc. It is possible to post to a topic, which means + that that thing happened. + + Associated with each topic is a current generation, which is a 64 bit value. When you query a + topic, you get back a generation. If on the next query the generation has increased, then it + indicates someone posted to the topic. + + For example, if you are monitoring a child process, you can query the sigchld topic. If it has + increased since your last query, it is possible that your child process has exited. + + Topic postings may be coalesced. That is there may be two posts to a given topic, yet the + generation only increases by 1. The only guarantee is that after a topic post, the current + generation value is larger than any value previously queried. + + Tying this all together is the topic_monitor_t. This provides the current topic generations, and + also provides the ability to perform a blocking wait for any topic to change in a particular topic + set. This is the real power of topics: you can wait for a sigchld signal OR a thread exit. + */ + +/// The list of topics that may be observed. +enum class topic_t : uint8_t { + sigchld, // Corresponds to SIGCHLD signal. + COUNT +}; + +/// Allow enum_iter to be used. +template <> +struct enum_info_t { + static constexpr auto count = topic_t::COUNT; +}; + +/// Set of topics. +using topic_set_t = enum_set_t; + +/// Counting iterator for topics. +using topic_iter_t = enum_iter_t; + +/// A generation is a counter incremented every time the value of a topic changes. +/// It is 64 bit so it will never wrap. +using generation_t = uint64_t; + +/// A generation value which is guaranteed to never be set and be larger than any valid generation. +constexpr generation_t invalid_generation = std::numeric_limits::max(); + +/// List of generation values, indexed by topics. +/// The initial value of a generation is always 0. +using generation_list_t = enum_array_t; + +/// Teh topic monitor class. This permits querying the current generation values for topics, +/// optionally blocking until they increase. +class topic_monitor_t { + private: + using topic_set_raw_t = uint8_t; + + static_assert(sizeof(topic_set_raw_t) * CHAR_BIT >= enum_count(), + "topic_set_raw is too small"); + + /// The current topic generation list, protected by a mutex. Note this may be opportunistically + /// updated at the point it is queried. + owning_lock current_gen_{{}}; + + /// The set of topics which have pending increments. + /// This is managed via atomics. + std::atomic pending_updates_{}; + + /// When a topic set is queried in a blocking way, the waiters are put into a queue. The waiter + /// with the smallest metagen is responsible for announcing the change to the rest of the + /// waiters. (The metagen is just the sum of the current generations.) Note that this is a + /// max-heap that defaults to std::less; by using std::greater it becomes a min heap. This is + /// protected by wait_queue_lock_. + std::priority_queue, std::greater> + wait_queue_; + + /// Mutex guarding the wait queue. + std::mutex wait_queue_lock_{}; + + /// Condition variable for broadcasting notifications. + std::condition_variable wait_queue_notifier_{}; + + /// Pipes used to communicate changes from the signal handler. + autoclose_pipes_t pipes_; + + /// \return the metagen for a topic generation list. + /// The metagen is simply the sum of topic generations. Note it is monotone. + static inline generation_t metagen_for(const generation_list_t &lst) { + return std::accumulate(lst.begin(), lst.end(), generation_t{0}); + } + + /// Wait for the current metagen to become different from \p gen. + /// If it is already different, return immediately. + void await_metagen(generation_t gen); + + /// Return the current generation list, opportunistically applying any pending updates. + generation_list_t updated_gens() { + auto current_gens = current_gen_.acquire(); + + // Atomically acquire the pending updates, swapping in 0. + // If there are no pending updates (likely), just return. + // Otherwise CAS in 0 and update our topics. + const auto relaxed = std::memory_order_relaxed; + topic_set_raw_t raw; + bool cas_success; + do { + raw = pending_updates_.load(relaxed); + if (raw == 0) return *current_gens; + cas_success = pending_updates_.compare_exchange_weak(raw, 0, relaxed, relaxed); + } while (!cas_success); + + // Update the current generation with our topics and return it. + auto topics = topic_set_t::from_raw(raw); + for (topic_t topic : topic_iter_t{}) { + current_gens->at(topic) += topics.get(topic) ? 1 : 0; + } + return *current_gens; + } + + /// \return the metagen for the current topic generation list. + inline generation_t current_metagen() { return metagen_for(updated_gens()); } + + public: + topic_monitor_t(); + ~topic_monitor_t(); + + /// topic_monitors should not be copied, and there should be no reason to move one. + void operator=(const topic_monitor_t &) = delete; + topic_monitor_t(const topic_monitor_t &) = delete; + void operator=(topic_monitor_t &&) = delete; + topic_monitor_t(topic_monitor_t &&) = delete; + + /// The principal topic_monitor. This may be fetched from a signal handler. + static topic_monitor_t &principal(); + + /// Post to a topic, potentially from a signal handler. + void post(topic_t topic); + + /// Access the current generations. + generation_list_t current_generations() { return updated_gens(); } + + /// Access the generation for a topic. + generation_t generation_for_topic(topic_t topic) { return current_generations().at(topic); } + + /// See if for any topic (specified in \p topics) has changed from the values in the generation + /// list \p gens. If \p wait is set, then wait if there are no changes; otherwise return + /// immediately. + /// \return the set of topics that changed, updating the generation list \p gens. + topic_set_t check(generation_list_t *gens, topic_set_t topics, bool wait); +}; + +#endif From a4dc04a28e577a13f3748a0193c5e2d5d3792b36 Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Thu, 14 Feb 2019 20:55:43 -0800 Subject: [PATCH 04/10] Add sighupint topic This corresponds to SIGHUP and SIGINT. This will be used to break out of process_mark_finished_children(). --- src/enum_set.h | 4 ++++ src/fish_tests.cpp | 14 ++++++++------ src/signal.cpp | 2 ++ src/topic_monitor.h | 3 ++- 4 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/enum_set.h b/src/enum_set.h index 22eb8c66d..cbec399d0 100644 --- a/src/enum_set.h +++ b/src/enum_set.h @@ -31,6 +31,10 @@ class enum_set_t : private std::bitset()> { /*implicit*/ enum_set_t(T v) { set(v); } + /*implicit*/ enum_set_t(std::initializer_list vs) { + for (T v : vs) set(v); + } + static enum_set_t from_raw(unsigned long v) { return enum_set_t{v}; } unsigned long to_raw() const { return super::to_ulong(); } diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index dc2522784..538b05ebd 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -5102,14 +5102,15 @@ static void test_topic_monitor_torture() { say(L"Torture-testing topic monitor"); topic_monitor_t monitor; const size_t thread_count = 64; - constexpr auto t = topic_t::sigchld; + constexpr auto t1 = topic_t::sigchld; + constexpr auto t2 = topic_t::sighupint; std::vector gens; gens.resize(thread_count, generation_list_t{}); std::atomic post_count{}; for (auto &gen : gens) { gen = monitor.current_generations(); post_count += 1; - monitor.post(t); + monitor.post(t1); } std::atomic completed{}; @@ -5119,9 +5120,10 @@ static void test_topic_monitor_torture() { [&](size_t i) { for (size_t j = 0; j < (1 << 11); j++) { auto before = gens[i]; - auto changed = monitor.check(&gens[i], topic_set_t{t}, true /* wait */); - do_test(before[t] < gens[i][t]); - do_test(gens[i][t] <= post_count); + auto changed = monitor.check(&gens[i], topic_set_t{t1, t2}, true /* wait */); + do_test(before[t1] < gens[i][t1]); + do_test(gens[i][t1] <= post_count); + do_test(gens[i][t2] == 0); } auto amt = completed.fetch_add(1, std::memory_order_relaxed); }, @@ -5130,7 +5132,7 @@ static void test_topic_monitor_torture() { while (completed.load(std::memory_order_relaxed) < thread_count) { post_count += 1; - monitor.post(t); + monitor.post(t1); } for (auto &t : threads) t.join(); } diff --git a/src/signal.cpp b/src/signal.cpp index 8ca202330..927eb2dae 100644 --- a/src/signal.cpp +++ b/src/signal.cpp @@ -231,6 +231,7 @@ static void handle_hup(int sig, siginfo_t *info, void *context) { } else { reader_exit(1, 1); } + topic_monitor_t::principal().post(topic_t::sighupint); } /// Handle sigterm. The only thing we do is restore the front process ID, then die. @@ -249,6 +250,7 @@ static void handle_int(int sig, siginfo_t *info, void *context) { if (reraise_if_forked_child(sig)) return; reader_handle_sigint(); default_handler(sig, info, context); + topic_monitor_t::principal().post(topic_t::sighupint); } /// Non-interactive ^C handler. diff --git a/src/topic_monitor.h b/src/topic_monitor.h index fb61f06e5..f74c900ec 100644 --- a/src/topic_monitor.h +++ b/src/topic_monitor.h @@ -35,7 +35,8 @@ /// The list of topics that may be observed. enum class topic_t : uint8_t { - sigchld, // Corresponds to SIGCHLD signal. + sigchld, // Corresponds to SIGCHLD signal. + sighupint, // Corresponds to both SIGHUP and SIGINT signals. COUNT }; From a95bc849c57d02e4bc2b3b04dc504a2d14de9fd8 Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Sat, 16 Feb 2019 17:39:14 -0800 Subject: [PATCH 05/10] Rewrite process_mark_finished_children using topics This is a big change to how process reaping works, reimplenting it using topics. The idea is to simplify the logic in process_mark_finished_children around blocking, and also prepare for "internal processes" which do not correspond to real processes. Before this change, fish would use waitpid() to wait for a process group, OR would individually poll processes if the process group leader was unreapable. After this change, fish no longer ever calls blocking waitpid(). Instead fish uses the topic mechanism. For each reapable process, fish checks if it has received a SIGCHLD since last poll; if not it waits until the next SIGCHLD, and then polls them all. --- src/proc.cpp | 243 +++++++++---------------------------------------- src/proc.h | 23 ++++- src/signal.cpp | 1 - 3 files changed, 63 insertions(+), 204 deletions(-) diff --git a/src/proc.cpp b/src/proc.cpp index c52cf09dd..0561a60d4 100644 --- a/src/proc.cpp +++ b/src/proc.cpp @@ -361,194 +361,59 @@ void add_disowned_pgid(pid_t pgid) { } } -/// A static value tracking how many SIGCHLDs we have seen, which is used in a heurstic to -/// determine if we should call waitpid() at all in `process_mark_finished_children`. -static volatile process_generation_count_t s_sigchld_generation_cnt = 0; - -/// See if any children of a fully constructed job have exited or been killed, and mark them -/// accordingly. We cannot reap just any child that's exited, (as in, `waitpid(-1,…`) since -/// that may reap a pgrp leader that has exited but in a job with another process that has yet to -/// launch and join its pgrp (#5219). -/// \param block_on_fg when true, blocks waiting for the foreground job to finish. -/// \return whether the operation completed without incident -static bool process_mark_finished_children(bool block_on_fg) { +/// See if any reapable processes have exited, and mark them accordingly. +/// \param block_ok if no reapable processes have exited, block until one is (or until we receive a +/// signal). +static void process_mark_finished_children(bool block_ok) { ASSERT_IS_MAIN_THREAD(); - // We can't always use SIGCHLD to determine if waitpid() should be called since it is not - // strictly one-SIGCHLD-per-one-child-exited (i.e. multiple exits can share a SIGCHLD call) and - // we a) return immediately the first time a dead child is reaped, b) explicitly skip over jobs - // that aren't yet fully constructed, so it's possible that we can get SIGCHLD and even find a - // killed child in the jobs we are reaping, but also have an exited child process in a job that - // hasn't been fully constructed yet - which means we can end up never knowing about the exited - // child process in that job if we use SIGCHLD count as the only metric for whether or not - // waitpid() is called. - // Without this optimization, the slowdown caused by calling waitpid() even just once each time - // `process_mark_finished_children()` is called is rather obvious (see the performance-related - // discussion in #5219), making it worth the complexity of this heuristic. - - /// Tracks whether or not we received SIGCHLD without checking all jobs (due to jobs under - /// construction), forcing a full waitpid loop. - static bool dirty_state = true; - static process_generation_count_t last_sigchld_count = -1; - - // If the last time that we received a SIGCHLD we did not waitpid all jobs, we cannot early out. - if (!dirty_state && last_sigchld_count == s_sigchld_generation_cnt) { - // If we have foreground jobs, we need to block on them below - if (!block_on_fg) { - // We can assume that no children have exited and that all waitpid calls with - // WNOHANG below will confirm that. - return true; + // Get the exit and signal generations of all reapable processes. + // The exit generation tells us if we have an exit; the signal generation allows for detecting + // SIGHUP and SIGINT. + generation_list_t gens{}; + gens.fill(invalid_generation); + job_iterator_t jobs; + while (auto *j = jobs.next()) { + for (const auto &proc : j->processes) { + if (j->can_reap(proc.get())) { + gens[topic_t::sigchld] = + std::min(gens[topic_t::sigchld], proc->gens_[topic_t::sigchld]); + gens[topic_t::sighupint] = + std::min(gens[topic_t::sighupint], proc->gens_[topic_t::sighupint]); + } } } - last_sigchld_count = s_sigchld_generation_cnt; - bool jobs_skipped = false; - bool has_error = false; - job_t *job_fg = nullptr; + if (gens[topic_t::sigchld] == invalid_generation) { + // No reapable processes, nothing to wait for. + return; + } - // Reap only processes belonging to fully-constructed jobs to prevent reaping of processes - // before others in the same process group have a chance to join their pgrp. - job_iterator_t jobs; - while (auto j = jobs.next()) { - // (A job can have pgrp INVALID_PID if it consists solely of builtins that perform no IO) - if (j->pgid == INVALID_PID || !j->is_constructed()) { - debug(5, "Skipping wait on incomplete job %d (%ls)", j->job_id, j->preview().c_str()); - jobs_skipped = true; - continue; - } + // Now check for changes, optionally waiting. + topic_set_t topics{{topic_t::sigchld, topic_t::sighupint}}; + auto changed_topics = topic_monitor_t::principal().check(&gens, topics, block_ok); + if (changed_topics.none()) return; - if (j != job_fg && j->is_foreground() && !j->is_stopped() && !j->is_completed()) { - // Ensure that we don't have multiple fully constructed foreground jobs. - assert((!job_fg || !job_fg->job_chain_is_fully_constructed() || - !j->job_chain_is_fully_constructed()) && - "More than one active, fully-constructed foreground job!"); - job_fg = j; - } + // We got some changes. Since we last checked we received SIGCHLD, and or HUP/INT. + // Update the hup/int generations and reap any reapable processes. + jobs.reset(); + while (auto *j = jobs.next()) { + for (auto &proc : j->processes) { + // Update the signalhupint generation so we don't break on old sighupints. + proc->gens_[topic_t::sighupint] = gens[topic_t::sighupint]; - // Whether we will wait for uncompleted processes depends on the combination of - // `block_on_fg` and the nature of the process. Default is WNOHANG, but if foreground, - // constructed, not stopped, *and* block_on_fg is true, then no WNOHANG (i.e. "HANG"). - int options = WUNTRACED | WNOHANG; - - // We should never block twice in the same go, as `waitpid()' returning could mean one - // process completed or many, and there is a race condition when calling `waitpid()` after - // the process group exits having reaped all children and terminated the process group and - // when a subsequent call to `waitpid()` for the same process group returns immediately if - // that process group no longer exists. i.e. it's possible for all processes to have exited - // but the process group to remain momentarily valid, in which case calling `waitpid()` - // without WNOHANG can cause an infinite wait. Additionally, only wait on external jobs that - // spawned new process groups (i.e. JOB_CONTROL). We do not break or return on error as we - // wait on only one pgrp at a time and we need to check all pgrps before returning, but we - // never wait/block on fg processes after an error has been encountered to give ourselves - // (elsewhere) a chance to handle the fallout from process termination, etc. - if (!has_error && block_on_fg && j == job_fg) { - debug(4, "Waiting on processes from foreground job %d", job_fg->pgid); - options &= ~WNOHANG; - } - - // Child jobs (produced via execution of functions) share job ids with their not-yet- - // fully-constructed parent jobs, so we have to wait on these by individual process id - // and not by the shared pgroup. End result is the same, but it just makes more calls - // to the kernel. - bool wait_by_process = !j->job_chain_is_fully_constructed(); - - // Firejail can result in jobs with pgroup 0, in which case we cannot wait by - // job id. See discussion in #5295. - if (j->pgid == 0) { - wait_by_process = true; - } - - // Cygwin does some voodoo with regards to process management that I do not understand, but - // long story short, we cannot reap processes by their pgroup. The way child processes are - // launched under Cygwin is... weird, and outwardly they do not appear to retain information - // about their parent process when viewed in Task Manager. Waiting on processes by their - // pgroup results in never reaping any, so we just wait on them by process id instead. - if (is_cygwin()) { - wait_by_process = true; - } - - // When waiting on processes individually in a pipeline, we need to enumerate in reverse - // order so that the first process we actually wait on (i.e. ~WNOHANG) is the last process - // in the IO chain, because that's the one that controls the lifetime of the foreground job - // - as long as it is still running, we are in the background and once it exits or is - // killed, all previous jobs in the IO pipeline must necessarily terminate as well. - auto process = j->processes.rbegin(); - // waitpid(2) returns 1 process each time, we need to keep calling it until we've reaped all - // children of the pgrp in question or else we can't reset the dirty_state flag. In all - // cases, calling waitpid(2) is faster than potentially calling select_try() on a process - // that has exited, which will force us to wait the full timeout before coming back here and - // calling waitpid() again. - while (true) { - int status; - pid_t pid; - - if (wait_by_process) { - // If the evaluation of a function resulted in the sharing of a pgroup between the - // real job and the job that shouldn't have been created as a separate job AND the - // parent job is still under construction (which is the case when continue_job() is - // first called on the child job during the recursive call to exec_job() before the - // parent job has been fully constructed), we need to call waitpid(2) on the - // individual processes of the child job instead of using a catch-all waitpid(2) - // call on the job's process group. - if (process == j->processes.rend()) { - break; - } - assert((*process)->pid != INVALID_PID && "Waiting by process on an invalid PID!"); - if ((*process)->completed) { - // This process has already been waited on to completion - process++; - continue; - } - - if ((options & WNOHANG) == 0) { - debug(4, "Waiting on individual process %d: %ls", (*process)->pid, (*process)->argv0()); - } else { - debug(4, "waitpid with WNOHANG on individual process %d", (*process)->pid); - } - pid = waitpid((*process)->pid, &status, options); - - process++; - } else { - // A negative PID passed in to `waitpid()` means wait on any child in that process - // group - pid = waitpid(-1 * j->pgid, &status, options); - } - - if (pid > 0) { - // A child process has been reaped - debug(4, "Reaped PID %d", pid); - handle_child_status(pid, status); - - // Always set WNOHANG (that is, don't hang). Otherwise we might wait on a non-stopped job - // that becomes stopped, but we don't refresh our view of the process state before - // calling waitpid(2) again here. - options |= WNOHANG; - } else if (pid == 0 || errno == ECHILD) { - // No killed/dead children in this particular process group - if (!wait_by_process) { - if ((options & WNOHANG) == 0) { - // This normally implies that the job has completed, but if we try to wait - // on a job that includes a process that changed its own group before we - // enter `waitpid`, we will be waiting forever. See #5596 for such a case. - wait_by_process = true; - continue; + // Try reaping processes whose sigchld count is below what was returned. + if (changed_topics.get(topic_t::sigchld)) { + if (j->can_reap(proc.get()) && + proc->gens_[topic_t::sigchld] < gens[topic_t::sigchld]) { + proc->gens_[topic_t::sigchld] = gens[topic_t::sigchld]; + int status = 0; + auto pid = waitpid(proc->pid, &status, WNOHANG | WUNTRACED); + if (pid > 0) { + debug(4, "Reaped PID %d", pid); + handle_child_status(pid, status); } - break; } - } else { - // pid < 0 indicates an error. One likely failure is ECHILD (no children), which is - // not an error and is ignored in the branch above. The other likely failure is - // EINTR, which means we got a signal, which is considered an error. We absolutely - // do not break or return on error, as we need to iterate over all constructed jobs - // but we only call waitpid for one pgrp at a time. We do bypass future waits in - // case of error, however. - has_error = true; - - // Do not audibly complain on interrupt (see #5293) - if (errno != EINTR) { - wperror(L"waitpid in process_mark_finished_children"); - } - break; } } } @@ -559,28 +424,6 @@ static bool process_mark_finished_children(bool block_on_fg) { s_disowned_pids.erase(std::remove_if(s_disowned_pids.begin(), s_disowned_pids.end(), [&status](pid_t pid) { return waitpid(pid, &status, WNOHANG) > 0; }), s_disowned_pids.end()); - - // Yes, the below can be collapsed to a single line, but it's worth being explicit about it with - // the comments. Fret not, the compiler will optimize it. (It better!) - if (jobs_skipped) { - // We received SIGCHLD but were not able to definitely say whether or not all children were - // reaped. - dirty_state = true; - } else { - // We can safely assume that no SIGCHLD means we can just return next time around - dirty_state = false; - } - - return !has_error; -} - -/// This is called from a signal handler. The signal is always SIGCHLD. -void job_handle_signal(int signal, siginfo_t *info, void *context) { - UNUSED(signal); - UNUSED(info); - UNUSED(context); - // This is the only place that this generation count is modified. It's OK if it overflows. - s_sigchld_generation_cnt += 1; } /// Given a command like "cat file", truncate it to a reasonable length. diff --git a/src/proc.h b/src/proc.h index 03472029d..170346ea1 100644 --- a/src/proc.h +++ b/src/proc.h @@ -20,6 +20,7 @@ #include "io.h" #include "parse_tree.h" #include "tnode.h" +#include "topic_monitor.h" /// Types of processes. enum process_type_t { @@ -114,6 +115,10 @@ class process_t { /// Actual command to pass to exec in case of EXTERNAL or INTERNAL_EXEC. wcstring actual_cmd; + + /// Generation counts for reaping. + generation_list_t gens_{}; + /// Process ID pid_t pid{0}; /// File descriptor that pipe output should bind to. @@ -200,6 +205,21 @@ class job_t { /// Sets the command. void set_command(const wcstring &cmd) { command_str = cmd; } + /// \return whether it is OK to reap a given process. Sometimes we want to defer reaping a + /// process if it is the group leader and the job is not yet constructed, because then we might + /// also reap the process group and then we cannot add new processes to the group. + bool can_reap(const process_t *p) const { + if (p->pid <= 0) { + // Can't reap without a pid. + return false; + } + if (!is_constructed() && pgid > 0 && p->pid == pgid) { + // p is the the group leader in an under-construction job. + return false; + } + return true; + } + /// Returns a truncated version of the job string. Used when a message has already been emitted /// containing the full job string and job id, but using the job id alone would be confusing /// due to reuse of freed job ids. Prevents overloading the debug comments with the full, @@ -359,9 +379,6 @@ int proc_get_last_status(); /// \param interactive whether interactive jobs should be reaped as well bool job_reap(bool interactive); -/// Signal handler for SIGCHLD. Mark any processes with relevant information. -void job_handle_signal(int signal, siginfo_t *info, void *con); - /// Mark a process as failed to execute (and therefore completed). void job_mark_process_as_failed(const std::shared_ptr &job, const process_t *p); diff --git a/src/signal.cpp b/src/signal.cpp index 927eb2dae..8c2886650 100644 --- a/src/signal.cpp +++ b/src/signal.cpp @@ -263,7 +263,6 @@ static void handle_int_notinteractive(int sig, siginfo_t *info, void *context) { /// sigchld handler. Does notification and calls the handler in proc.c. static void handle_chld(int sig, siginfo_t *info, void *context) { if (reraise_if_forked_child(sig)) return; - job_handle_signal(sig, info, context); default_handler(sig, info, context); topic_monitor_t::principal().post(topic_t::sigchld); } From ebe2dc2766cff306f7e2d4b9ebfd851dba038f00 Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Sat, 16 Feb 2019 17:35:16 -0800 Subject: [PATCH 06/10] Processes to record topic generations before execution The sigchld generation expresses the idea that, if we receive a sigchld signal, the generation will be different than when we last recorded it. A process cannot exit before it has launched, so check the generation count before process launch. This is an optimization that reduces failing waitpid calls. --- src/exec.cpp | 1 + src/proc.cpp | 6 +++++- src/proc.h | 5 +++++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/exec.cpp b/src/exec.cpp index bf311bb74..c9d662464 100644 --- a/src/exec.cpp +++ b/src/exec.cpp @@ -899,6 +899,7 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr< } // Execute the process. + p->check_generations_before_launch(); switch (p->type) { case INTERNAL_FUNCTION: case INTERNAL_BLOCK_NODE: { diff --git a/src/proc.cpp b/src/proc.cpp index 0561a60d4..667d76ace 100644 --- a/src/proc.cpp +++ b/src/proc.cpp @@ -324,7 +324,11 @@ static void handle_child_status(pid_t pid, int status) { } } -process_t::process_t() {} +process_t::process_t() = default; + +void process_t::check_generations_before_launch() { + gens_ = topic_monitor_t::principal().current_generations(); +} job_t::job_t(job_id_t jobid, io_chain_t bio, std::shared_ptr parent) : block_io(std::move(bio)), diff --git a/src/proc.h b/src/proc.h index 170346ea1..7379bd065 100644 --- a/src/proc.h +++ b/src/proc.h @@ -113,6 +113,11 @@ class process_t { void set_io_chain(const io_chain_t &chain) { this->process_io_chain = chain; } + /// Store the current topic generations. That is, right before the process is launched, record + /// the generations of all topics; then we can tell which generation values have changed after + /// launch. This helps us avoid spurious waitpid calls. + void check_generations_before_launch(); + /// Actual command to pass to exec in case of EXTERNAL or INTERNAL_EXEC. wcstring actual_cmd; From 061f8f49c6cec400599d41c295598642706a252e Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Sat, 2 Feb 2019 12:52:51 -0800 Subject: [PATCH 07/10] Add dup2_list_t::fd_for_target_fd This adds an "in-process" interpretation of dup2s, allowing for fish to output directly to the correct file descriptor without having to perform an in-kernel dup2 sequence. --- src/fish_tests.cpp | 25 +++++++++++++++++++++++++ src/redirection.cpp | 20 ++++++++++++++++++++ src/redirection.h | 5 +++++ 3 files changed, 50 insertions(+) diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index 538b05ebd..5f83e1c06 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -2422,6 +2422,30 @@ static void test_dup2s() { do_test(!list.has_value()); } +static void test_dup2s_fd_for_target_fd() { + using std::make_shared; + io_chain_t chain; + // note io_fd_t params are backwards from dup2. + chain.push_back(make_shared(10)); + chain.push_back(make_shared(9, 10, true)); + chain.push_back(make_shared(5, 8, true)); + chain.push_back(make_shared(1, 4, true)); + chain.push_back(make_shared(3, 5, true)); + auto list = dup2_list_t::resolve_chain(chain); + + do_test(list.has_value()); + do_test(list->fd_for_target_fd(3) == 8); + do_test(list->fd_for_target_fd(5) == 8); + do_test(list->fd_for_target_fd(8) == 8); + do_test(list->fd_for_target_fd(1) == 4); + do_test(list->fd_for_target_fd(4) == 4); + do_test(list->fd_for_target_fd(100) == 100); + do_test(list->fd_for_target_fd(0) == 0); + do_test(list->fd_for_target_fd(-1) == -1); + do_test(list->fd_for_target_fd(9) == -1); + do_test(list->fd_for_target_fd(10) == -1); +} + /// Testing colors. static void test_colors() { say(L"Testing colors"); @@ -5223,6 +5247,7 @@ int main(int argc, char **argv) { if (should_test_function("test")) test_test(); if (should_test_function("wcstod")) test_wcstod(); if (should_test_function("dup2s")) test_dup2s(); + if (should_test_function("dup2s")) test_dup2s_fd_for_target_fd(); if (should_test_function("path")) test_path(); if (should_test_function("pager_navigation")) test_pager_navigation(); if (should_test_function("pager_layout")) test_pager_layout(); diff --git a/src/redirection.cpp b/src/redirection.cpp index 64acdda23..c0c1b6985 100644 --- a/src/redirection.cpp +++ b/src/redirection.cpp @@ -88,3 +88,23 @@ maybe_t dup2_list_t::resolve_chain(const io_chain_t &io_chain) { } return {std::move(result)}; } + +int dup2_list_t::fd_for_target_fd(int target) const { + // Paranoia. + if (target < 0) { + return target; + } + // Note we can simply walk our action list backwards, looking for src -> target dups. + int cursor = target; + for (auto iter = actions_.rbegin(); iter != actions_.rend(); ++iter) { + if (iter->target == cursor) { + // cursor is replaced by iter->src + cursor = iter->src; + } else if (iter->src == cursor && iter->target < 0) { + // cursor is closed. + cursor = -1; + break; + } + } + return cursor; +} diff --git a/src/redirection.h b/src/redirection.h index 64fd60043..16e8c155b 100644 --- a/src/redirection.h +++ b/src/redirection.h @@ -60,6 +60,11 @@ class dup2_list_t { /// The result contains the list of fd actions (dup2 and close), as well as the list /// of fds opened. static maybe_t resolve_chain(const io_chain_t &); + + /// \return the fd ultimately dup'd to a target fd, or -1 if the target is closed. + /// For example, if target fd is 1, and we have a dup2 chain 5->3 and 3->1, then we will return 5. + /// If the target is not referenced in the chain, returns target. + int fd_for_target_fd(int target) const; }; #endif From ada8ea954e0891b9aab9fc8310ba9393aa606ad2 Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Wed, 13 Feb 2019 15:17:07 -0800 Subject: [PATCH 08/10] Use "internal" processes to write buffered output This introduces "internal processes" which are backed by a pthread instead of a normal process. Internal processes are reaped using the topic machinery, plugging in neatly alongside the sigchld topic; this means that process_mark_finished_children() can wait for internal and external processes simultaneously. Initially internal processes replace the forked process that fish uses to write out the output of blocks and functions. --- src/common.h | 6 +++ src/exec.cpp | 106 ++++++++++++++++++++++++++++++++------------ src/fish_tests.cpp | 2 +- src/io.cpp | 3 +- src/proc.cpp | 60 +++++++++++++++++-------- src/proc.h | 44 ++++++++++++++++-- src/topic_monitor.h | 5 ++- 7 files changed, 171 insertions(+), 55 deletions(-) diff --git a/src/common.h b/src/common.h index 1433ad151..b74d6d98d 100644 --- a/src/common.h +++ b/src/common.h @@ -307,6 +307,12 @@ void vec_append(std::vector &receiver, std::vector &&donator) { std::make_move_iterator(donator.end())); } +/// Move an object into a shared_ptr. +template +std::shared_ptr move_to_sharedptr(T &&v) { + return std::make_shared(std::move(v)); +} + /// Print a stack trace to stderr. void show_stackframe(const wchar_t msg_level, int frame_count = 100, int skip_levels = 0); diff --git a/src/exec.cpp b/src/exec.cpp index c9d662464..dfd7cf8ef 100644 --- a/src/exec.cpp +++ b/src/exec.cpp @@ -34,6 +34,7 @@ #include "fallback.h" // IWYU pragma: keep #include "function.h" #include "io.h" +#include "iothread.h" #include "parse_tree.h" #include "parser.h" #include "postfork.h" @@ -55,16 +56,6 @@ /// Base open mode to pass to calls to open. #define OPEN_MASK 0666 -/// Called in a forked child. -static void exec_write_and_exit(int fd, const char *buff, size_t count, int status) { - if (write_loop(fd, buff, count) == -1) { - debug(0, WRITE_ERROR); - wperror(L"write"); - exit_without_destructors(status); - } - exit_without_destructors(status); -} - void exec_close(int fd) { ASSERT_IS_MAIN_THREAD(); @@ -361,6 +352,80 @@ static void on_process_created(const std::shared_ptr &j, pid_t child_pid) } } +/// Construct an internal process for the process p. In the background, write the data \p outdata to +/// stdout, respecting the io chain \p ios. For example if target_fd is 1 (stdout), and there is a +/// dup2 3->1, then we need to write to fd 3. Then exit the internal process. +static bool run_internal_process(process_t *p, std::string outdata, io_chain_t ios) { + p->check_generations_before_launch(); + + // We want both the dup2s and the io_chain_ts to be kept alive by the background thread, because + // they may own an fd that we want to write to. Move them all to a shared_ptr. The strings as + // well (they may be long). + // Construct a little helper struct to make it simpler to move into our closure without copying. + struct write_fields_t { + int src_outfd{-1}; + std::string outdata{}; + + io_chain_t ios{}; + maybe_t dup2s{}; + std::shared_ptr internal_proc{}; + + int success_status{}; + + bool skip_out() const { return outdata.empty() || src_outfd < 0; } + }; + + auto f = std::make_shared(); + f->outdata = std::move(outdata); + + // Construct and assign the internal process to the real process. + p->internal_proc_ = std::make_shared(); + f->internal_proc = p->internal_proc_; + + // Resolve the IO chain. + // Note it's important we do this even if we have no out or err data, because we may have been + // asked to truncate a file (e.g. `echo -n '' > /tmp/truncateme.txt'). The open() in the dup2 + // list resolution will ensure this happens. + f->dup2s = dup2_list_t::resolve_chain(ios); + if (!f->dup2s) { + return false; + } + + // Figure out which source fds to write to. If they are closed (unlikely) we just exit + // successfully. + f->src_outfd = f->dup2s->fd_for_target_fd(STDOUT_FILENO); + + // If we have nothing to right we can elide the thread. + // TODO: support eliding output to /dev/null. + if (f->skip_out()) { + f->internal_proc->mark_exited(EXIT_SUCCESS); + return true; + } + + // Ensure that ios stays alive, it may own fds. + f->ios = ios; + + // If our process is a builtin, it will have already set its status value. Make sure we + // propagate that if our I/O succeeds and don't read it on a background thread. TODO: have + // builtin_run provide this directly, rather than setting it in the process. + f->success_status = p->status; + + iothread_perform([f]() { + int status = f->success_status; + if (!f->skip_out()) { + ssize_t ret = write_loop(f->src_outfd, f->outdata.data(), f->outdata.size()); + if (ret < 0) { + if (errno != EPIPE) { + wperror(L"write"); + } + if (!status) status = 1; + } + } + f->internal_proc->mark_exited(status); + }); + return true; +} + /// Call fork() as part of executing a process \p p in a job \j. Execute \p child_action in the /// context of the child. Returns true if fork succeeded, false if fork failed. static bool fork_child_for_process(const std::shared_ptr &job, process_t *p, @@ -784,24 +849,9 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr io_chain.remove(block_output_bufferfill); auto block_output_buffer = io_bufferfill_t::finish(std::move(block_output_bufferfill)); - // Resolve our IO chain to a sequence of dup2s. - auto dup2s = dup2_list_t::resolve_chain(io_chain); - if (!dup2s) { - return false; - } - - const std::string buffer_contents = block_output_buffer->buffer().newline_serialized(); - const char *buffer = buffer_contents.data(); - size_t count = buffer_contents.size(); - if (count > 0) { - // We don't have to drain threads here because our child process is simple. - const char *fork_reason = - p->type == INTERNAL_BLOCK_NODE ? "internal block io" : "internal function io"; - if (!fork_child_for_process(j, p, *dup2s, false, fork_reason, [&] { - exec_write_and_exit(STDOUT_FILENO, buffer, count, status); - })) { - return false; - } + std::string buffer_contents = block_output_buffer->buffer().newline_serialized(); + if (!buffer_contents.empty()) { + return run_internal_process(p, std::move(buffer_contents), io_chain); } else { if (p->is_last_in_job) { proc_set_last_status(j->get_flag(job_flag_t::NEGATE) ? (!status) : status); diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index 5f83e1c06..351e49a77 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -5106,7 +5106,7 @@ static void test_topic_monitor() { constexpr auto t = topic_t::sigchld; do_test(gens[t] == 0); do_test(monitor.generation_for_topic(t) == 0); - auto changed = monitor.check(&gens, {t}, false /* wait */); + auto changed = monitor.check(&gens, topic_set_t{t}, false /* wait */); do_test(changed.none()); do_test(gens[t] == 0); diff --git a/src/io.cpp b/src/io.cpp index 6f72c3eb0..67a9c03d5 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -13,6 +13,7 @@ #include "fallback.h" // IWYU pragma: keep #include "io.h" #include "iothread.h" +#include "redirection.h" #include "wutil.h" // IWYU pragma: keep io_data_t::~io_data_t() = default; @@ -122,7 +123,7 @@ void io_buffer_t::begin_background_fillthread(autoclose_fd_t fd) { // We want our background thread to own the fd but it's not easy to move into a std::function. // Use a shared_ptr. - auto fdref = std::make_shared(std::move(fd)); + auto fdref = move_to_sharedptr(std::move(fd)); // Our function to read until the receiver is closed. // It's OK to capture 'this' by value because 'this' owns the background thread and joins it diff --git a/src/proc.cpp b/src/proc.cpp index 667d76ace..a8ebc4793 100644 --- a/src/proc.cpp +++ b/src/proc.cpp @@ -247,6 +247,13 @@ bool job_t::signal(int signal) { return true; } +void internal_proc_t::mark_exited(int status) { + assert(!exited() && "Process is already exited"); + exited_.store(true, std::memory_order_relaxed); + status_.store(status, std::memory_order_release); + topic_monitor_t::principal().post(topic_t::internal_exit); +} + static void mark_job_complete(const job_t *j) { for (auto &p : j->processes) { p->completed = 1; @@ -374,48 +381,63 @@ static void process_mark_finished_children(bool block_ok) { // Get the exit and signal generations of all reapable processes. // The exit generation tells us if we have an exit; the signal generation allows for detecting // SIGHUP and SIGINT. + // Get the gen count of all reapable processes. + topic_set_t reaptopics{}; generation_list_t gens{}; gens.fill(invalid_generation); job_iterator_t jobs; while (auto *j = jobs.next()) { for (const auto &proc : j->processes) { - if (j->can_reap(proc.get())) { - gens[topic_t::sigchld] = - std::min(gens[topic_t::sigchld], proc->gens_[topic_t::sigchld]); + if (auto mtopic = j->reap_topic_for_process(proc.get())) { + topic_t topic = *mtopic; + reaptopics.set(topic); + gens[topic] = std::min(gens[topic], proc->gens_[topic]); + + reaptopics.set(topic_t::sighupint); gens[topic_t::sighupint] = std::min(gens[topic_t::sighupint], proc->gens_[topic_t::sighupint]); } } } - if (gens[topic_t::sigchld] == invalid_generation) { + if (reaptopics.none()) { // No reapable processes, nothing to wait for. return; } // Now check for changes, optionally waiting. - topic_set_t topics{{topic_t::sigchld, topic_t::sighupint}}; - auto changed_topics = topic_monitor_t::principal().check(&gens, topics, block_ok); + auto changed_topics = topic_monitor_t::principal().check(&gens, reaptopics, block_ok); if (changed_topics.none()) return; // We got some changes. Since we last checked we received SIGCHLD, and or HUP/INT. // Update the hup/int generations and reap any reapable processes. jobs.reset(); while (auto *j = jobs.next()) { - for (auto &proc : j->processes) { - // Update the signalhupint generation so we don't break on old sighupints. - proc->gens_[topic_t::sighupint] = gens[topic_t::sighupint]; + for (const auto &proc : j->processes) { + if (auto mtopic = j->reap_topic_for_process(proc.get())) { + // Update the signal hup/int gen. + proc->gens_[topic_t::sighupint] = gens[topic_t::sighupint]; - // Try reaping processes whose sigchld count is below what was returned. - if (changed_topics.get(topic_t::sigchld)) { - if (j->can_reap(proc.get()) && - proc->gens_[topic_t::sigchld] < gens[topic_t::sigchld]) { - proc->gens_[topic_t::sigchld] = gens[topic_t::sigchld]; - int status = 0; - auto pid = waitpid(proc->pid, &status, WNOHANG | WUNTRACED); - if (pid > 0) { - debug(4, "Reaped PID %d", pid); - handle_child_status(pid, status); + if (proc->gens_[*mtopic] < gens[*mtopic]) { + // Potentially reapable. Update its gen count and try reaping it. + proc->gens_[*mtopic] = gens[*mtopic]; + if (proc->internal_proc_) { + // Try reaping an internal process. + if (proc->internal_proc_->exited()) { + proc->status = proc->internal_proc_->get_status(); + proc->completed = true; + } + } else if (proc->pid > 0) { + // Try reaping an external process. + int status = -1; + auto pid = waitpid(proc->pid, &status, WNOHANG | WUNTRACED); + if (pid > 0) { + assert(pid == proc->pid && "Unexpcted waitpid() return"); + debug(4, "Reaped PID %d", pid); + handle_child_status(pid, status); + } + } else { + assert(0 && "Don't know how to reap this process"); } } } diff --git a/src/proc.h b/src/proc.h index 7379bd065..c588cf766 100644 --- a/src/proc.h +++ b/src/proc.h @@ -42,6 +42,28 @@ enum { JOB_CONTROL_NONE, }; +/// A structure representing a "process" internal to fish. This is backed by a pthread instead of a +/// separate process. +class internal_proc_t { + /// Whether the process has exited. + std::atomic exited_{}; + + /// If the process has exited, its status code. + std::atomic status_{}; + + public: + /// \return if this process has exited. + bool exited() const { return exited_.load(std::memory_order_relaxed); } + + /// Mark this process as exited, with the given status. + void mark_exited(int status); + + int get_status() const { + assert(exited() && "Process is not exited"); + return status_.load(std::memory_order_acquire); + } +}; + /// A structure representing a single fish process. Contains variables for tracking process state /// and the process argument list. Actually, a fish process can be either a regular external /// process, an internal builtin which may or may not spawn a fake IO process during execution, a @@ -126,6 +148,10 @@ class process_t { /// Process ID pid_t pid{0}; + + /// If we are an "internal process," that process. + std::shared_ptr internal_proc_{}; + /// File descriptor that pipe output should bind to. int pipe_write_fd{0}; /// True if process has completed. @@ -214,15 +240,25 @@ class job_t { /// process if it is the group leader and the job is not yet constructed, because then we might /// also reap the process group and then we cannot add new processes to the group. bool can_reap(const process_t *p) const { - if (p->pid <= 0) { + // Internal processes can always be reaped. + if (p->internal_proc_) { + return true; + } else if (p->pid <= 0) { // Can't reap without a pid. return false; - } - if (!is_constructed() && pgid > 0 && p->pid == pgid) { + } else if (!is_constructed() && pgid > 0 && p->pid == pgid) { // p is the the group leader in an under-construction job. return false; + } else { + return true; } - return true; + } + + /// \returns the reap topic for a process, which describes the manner in which we are reaped. A + /// none returns means don't reap, or perhaps defer reaping. + maybe_t reap_topic_for_process(const process_t *p) const { + if (p->completed || !can_reap(p)) return none(); + return p->internal_proc_ ? topic_t::internal_exit : topic_t::sigchld; } /// Returns a truncated version of the job string. Used when a message has already been emitted diff --git a/src/topic_monitor.h b/src/topic_monitor.h index f74c900ec..78498c4cc 100644 --- a/src/topic_monitor.h +++ b/src/topic_monitor.h @@ -35,8 +35,9 @@ /// The list of topics that may be observed. enum class topic_t : uint8_t { - sigchld, // Corresponds to SIGCHLD signal. - sighupint, // Corresponds to both SIGHUP and SIGINT signals. + sigchld, // Corresponds to SIGCHLD signal. + sighupint, // Corresponds to both SIGHUP and SIGINT signals. + internal_exit, // Corresponds to an internal process exit. COUNT }; From 4a2fd443b220d0957afe51f69509d8f18100ae74 Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Wed, 13 Feb 2019 15:17:18 -0800 Subject: [PATCH 09/10] Use internal processes to write builtin output This uses the new internal process mechanism to write output for builtins. After this the only reason fish ever forks is to execute external processes. --- src/exec.cpp | 48 ++++++++++++++++++++++++++---------------------- src/proc.cpp | 23 ++++++++++++++--------- 2 files changed, 40 insertions(+), 31 deletions(-) diff --git a/src/exec.cpp b/src/exec.cpp index dfd7cf8ef..3b283b638 100644 --- a/src/exec.cpp +++ b/src/exec.cpp @@ -353,9 +353,11 @@ static void on_process_created(const std::shared_ptr &j, pid_t child_pid) } /// Construct an internal process for the process p. In the background, write the data \p outdata to -/// stdout, respecting the io chain \p ios. For example if target_fd is 1 (stdout), and there is a -/// dup2 3->1, then we need to write to fd 3. Then exit the internal process. -static bool run_internal_process(process_t *p, std::string outdata, io_chain_t ios) { +/// stdout and \p errdata to stderr, respecting the io chain \p ios. For example if target_fd is 1 +/// (stdout), and there is a dup2 3->1, then we need to write to fd 3. Then exit the internal +/// process. +static bool run_internal_process(process_t *p, std::string outdata, std::string errdata, + io_chain_t ios) { p->check_generations_before_launch(); // We want both the dup2s and the io_chain_ts to be kept alive by the background thread, because @@ -366,6 +368,9 @@ static bool run_internal_process(process_t *p, std::string outdata, io_chain_t i int src_outfd{-1}; std::string outdata{}; + int src_errfd{-1}; + std::string errdata{}; + io_chain_t ios{}; maybe_t dup2s{}; std::shared_ptr internal_proc{}; @@ -373,10 +378,13 @@ static bool run_internal_process(process_t *p, std::string outdata, io_chain_t i int success_status{}; bool skip_out() const { return outdata.empty() || src_outfd < 0; } + + bool skip_err() const { return errdata.empty() || src_errfd < 0; } }; auto f = std::make_shared(); f->outdata = std::move(outdata); + f->errdata = std::move(errdata); // Construct and assign the internal process to the real process. p->internal_proc_ = std::make_shared(); @@ -394,10 +402,11 @@ static bool run_internal_process(process_t *p, std::string outdata, io_chain_t i // Figure out which source fds to write to. If they are closed (unlikely) we just exit // successfully. f->src_outfd = f->dup2s->fd_for_target_fd(STDOUT_FILENO); + f->src_errfd = f->dup2s->fd_for_target_fd(STDERR_FILENO); // If we have nothing to right we can elide the thread. // TODO: support eliding output to /dev/null. - if (f->skip_out()) { + if (f->skip_out() && f->skip_err()) { f->internal_proc->mark_exited(EXIT_SUCCESS); return true; } @@ -421,6 +430,15 @@ static bool run_internal_process(process_t *p, std::string outdata, io_chain_t i if (!status) status = 1; } } + if (!f->skip_err()) { + ssize_t ret = write_loop(f->src_errfd, f->errdata.data(), f->errdata.size()); + if (ret < 0) { + if (errno != EPIPE) { + wperror(L"write"); + } + if (!status) status = 1; + } + } f->internal_proc->mark_exited(status); }); return true; @@ -650,26 +668,12 @@ static bool handle_builtin_output(const std::shared_ptr &j, process_t *p, // in the child. // // These strings may contain embedded nulls, so don't treat them as C strings. - const std::string outbuff_str = wcs2string(stdout_stream.contents()); - const char *outbuff = outbuff_str.data(); - size_t outbuff_len = outbuff_str.size(); - - const std::string errbuff_str = wcs2string(stderr_stream.contents()); - const char *errbuff = errbuff_str.data(); - size_t errbuff_len = errbuff_str.size(); - - // Resolve our IO chain to a sequence of dup2s. - auto dup2s = dup2_list_t::resolve_chain(*io_chain); - if (!dup2s) { - return false; - } + std::string outbuff = wcs2string(stdout_stream.contents()); + std::string errbuff = wcs2string(stderr_stream.contents()); fflush(stdout); fflush(stderr); - if (!fork_child_for_process(j, p, *dup2s, false, "internal builtin", [&] { - do_builtin_io(outbuff, outbuff_len, errbuff, errbuff_len); - exit_without_destructors(p->status); - })) { + if (!run_internal_process(p, std::move(outbuff), std::move(errbuff), *io_chain)) { return false; } } @@ -851,7 +855,7 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr std::string buffer_contents = block_output_buffer->buffer().newline_serialized(); if (!buffer_contents.empty()) { - return run_internal_process(p, std::move(buffer_contents), io_chain); + return run_internal_process(p, std::move(buffer_contents), {} /*errdata*/, io_chain); } else { if (p->is_last_in_job) { proc_set_last_status(j->get_flag(job_flag_t::NEGATE) ? (!status) : status); diff --git a/src/proc.cpp b/src/proc.cpp index a8ebc4793..e0bfff3e9 100644 --- a/src/proc.cpp +++ b/src/proc.cpp @@ -922,15 +922,20 @@ void job_t::continue_job(bool send_sigcont) { } } - if (is_foreground()) { - if (is_completed()) { - // Set $status only if we are in the foreground and the last process in the job has - // finished and is not a short-circuited builtin. - auto &p = processes.back(); - if ((WIFEXITED(p->status) || WIFSIGNALED(p->status)) && p->pid) { - int status = proc_format_status(p->status); - proc_set_last_status(get_flag(job_flag_t::NEGATE) ? !status : status); - } + if (is_foreground() && is_completed()) { + // Set $status only if we are in the foreground and the last process in the job has + // finished and is not a short-circuited builtin. + bool negate = get_flag(job_flag_t::NEGATE); + auto &p = processes.back(); + if (p->internal_proc_) { + // Here the status is synthetic - not associated with a real exited process. + // TODO: clean this up, we shouldn't store the process's exit status in an unparsed + // state. + int status = p->status; + proc_set_last_status(negate ? !status : status); + } else if ((WIFEXITED(p->status) || WIFSIGNALED(p->status)) && p->pid) { + int status = proc_format_status(p->status); + proc_set_last_status(negate ? !status : status); } } } From 0b3eca1743f3a62a15ee6ab3bff4644207ebc6d0 Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Sun, 17 Feb 2019 14:16:47 -0800 Subject: [PATCH 10/10] Cleanup handle_builtin_output Now that we use an internal process to perform builtin output, simplify the logic around how it is performed. In particular we no longer have to be careful about async-safe functions since we do not fork. Also fix a bunch of comments that no longer apply. --- src/exec.cpp | 137 ++++++++++++++++++++++------------------------- src/io.cpp | 1 + src/io.h | 23 ++++---- src/postfork.cpp | 24 --------- src/postfork.h | 3 -- 5 files changed, 74 insertions(+), 114 deletions(-) diff --git a/src/exec.cpp b/src/exec.cpp index 3b283b638..9b29ed8f1 100644 --- a/src/exec.cpp +++ b/src/exec.cpp @@ -73,12 +73,11 @@ void exec_close(int fd) { } /// Returns true if the redirection is a file redirection to a file other than /dev/null. -static bool redirection_is_to_real_file(const io_data_t *io) { +static bool redirection_is_to_real_file(const shared_ptr &io) { bool result = false; - if (io != NULL && io->io_mode == io_mode_t::file) { + if (io && io->io_mode == io_mode_t::file) { // It's a file redirection. Compare the path to /dev/null. - const io_file_t *io_file = static_cast(io); - const char *path = io_file->filename_cstr; + const char *path = static_cast(io.get())->filename_cstr; if (strcmp(path, "/dev/null") != 0) { // It's not /dev/null. result = true; @@ -588,71 +587,71 @@ static bool exec_internal_builtin_proc(parser_t &parser, const std::shared_ptr &j, process_t *p, io_chain_t *io_chain, const io_streams_t &builtin_io_streams) { assert(p->type == INTERNAL_BUILTIN && "Process is not a builtin"); - // Handle output from builtin commands. In the general case, this means forking of a - // worker process, that will write out the contents of the stdout and stderr buffers - // to the correct file descriptor. Since forking is expensive, fish tries to avoid - // it when possible. - bool fork_was_skipped = false; - - const shared_ptr stdout_io = io_chain->get_io_for_fd(STDOUT_FILENO); - const shared_ptr stderr_io = io_chain->get_io_for_fd(STDERR_FILENO); const output_stream_t &stdout_stream = builtin_io_streams.out; const output_stream_t &stderr_stream = builtin_io_streams.err; - // If we are outputting to a file, we have to actually do it, even if we have no - // output, so that we can truncate the file. Does not apply to /dev/null. - bool must_fork = redirection_is_to_real_file(stdout_io.get()) || - redirection_is_to_real_file(stderr_io.get()); - if (!must_fork && p->is_last_in_job) { - // We are handling reads directly in the main loop. Note that we may still end - // up forking. - const bool stdout_is_bufferfill = - (stdout_io && stdout_io->io_mode == io_mode_t::bufferfill); - const std::shared_ptr stdout_buffer = - stdout_is_bufferfill ? static_cast(stdout_io.get())->buffer() - : nullptr; - const bool no_stdout_output = stdout_stream.empty(); - const bool no_stderr_output = stderr_stream.empty(); - const bool stdout_discarded = stdout_stream.buffer().discarded(); + // Mark if we discarded output. + if (stdout_stream.buffer().discarded()) p->status = STATUS_READ_TOO_MUCH; - if (!stdout_discarded && no_stdout_output && no_stderr_output) { - // The builtin produced no output and is not inside of a pipeline. No - // need to fork or even output anything. - debug(4, L"Skipping fork: no output for internal builtin '%ls'", p->argv0()); - fork_was_skipped = true; - } else if (no_stderr_output && stdout_buffer) { - // The builtin produced no stderr, and its stdout is going to an - // internal buffer. There is no need to fork. This helps out the - // performance quite a bit in complex completion code. - // TODO: we're sloppy about handling explicitly separated output. - // Theoretically we could have explicitly separated output on stdout and - // also stderr output; in that case we ought to thread the exp-sep output - // through to the io buffer. We're getting away with this because the only - // thing that can output exp-sep output is `string split0` which doesn't - // also produce stderr. - debug(4, L"Skipping fork: buffered output for internal builtin '%ls'", p->argv0()); + // We will try to elide constructing an internal process. However if the output is going to a + // real file, we have to do it. For example in `echo -n > file.txt` we proceed to open file.txt + // even though there is no output, so that it is properly truncated. + const shared_ptr stdout_io = io_chain->get_io_for_fd(STDOUT_FILENO); + const shared_ptr stderr_io = io_chain->get_io_for_fd(STDERR_FILENO); + bool must_use_process = + redirection_is_to_real_file(stdout_io) || redirection_is_to_real_file(stderr_io); - stdout_buffer->append_from_stream(stdout_stream); - fork_was_skipped = true; - } else if (stdout_io.get() == NULL && stderr_io.get() == NULL) { - // We are writing to normal stdout and stderr. Just do it - no need to fork. - debug(4, L"Skipping fork: ordinary output for internal builtin '%ls'", p->argv0()); - const std::string outbuff = wcs2string(stdout_stream.contents()); - const std::string errbuff = wcs2string(stderr_stream.contents()); - bool builtin_io_done = - do_builtin_io(outbuff.data(), outbuff.size(), errbuff.data(), errbuff.size()); - if (!builtin_io_done && errno != EPIPE) { - redirect_tty_output(); // workaround glibc bug - debug(0, "!builtin_io_done and errno != EPIPE"); - show_stackframe(L'E'); - } - if (stdout_discarded) p->status = STATUS_READ_TOO_MUCH; - fork_was_skipped = true; - } + // If we are directing output to a buffer, then we can just transfer it directly without needing + // to write to the bufferfill pipe. Note this is how we handle explicitly separated stdout + // output (i.e. string split0) which can't really be sent through a pipe. + // TODO: we're sloppy about handling explicitly separated output. + // Theoretically we could have explicitly separated output on stdout and also stderr output; in + // that case we ought to thread the exp-sep output through to the io buffer. We're getting away + // with this because the only thing that can output exp-sep output is `string split0` which + // doesn't also produce stderr. Also note that we never send stderr to a buffer, so there's no + // need for a similar check for stderr. + bool stdout_done = false; + if (stdout_io && stdout_io->io_mode == io_mode_t::bufferfill) { + auto stdout_buffer = static_cast(stdout_io.get())->buffer(); + stdout_buffer->append_from_stream(stdout_stream); + stdout_done = true; } - if (fork_was_skipped) { + // Figure out any data remaining to write. We may have none in which case we can short-circuit. + std::string outbuff = stdout_done ? std::string{} : wcs2string(stdout_stream.contents()); + std::string errbuff = wcs2string(stderr_stream.contents()); + + // If we have no redirections for stdout/stderr, just write them directly. + if (!stdout_io && !stderr_io) { + bool did_err = false; + if (write_loop(STDOUT_FILENO, outbuff.data(), outbuff.size()) < 0) { + if (errno != EPIPE) { + did_err = true; + debug(0, "Error while writing to stdout"); + wperror(L"write_loop"); + } + } + if (write_loop(STDERR_FILENO, errbuff.data(), errbuff.size()) < 0) { + if (errno != EPIPE && !did_err) { + did_err = true; + debug(0, "Error while writing to stderr"); + wperror(L"write_loop"); + } + } + if (did_err) { + redirect_tty_output(); // workaround glibc bug + debug(0, "!builtin_io_done and errno != EPIPE"); + show_stackframe(L'E'); + } + // Clear the buffers to indicate we finished. + outbuff.clear(); + errbuff.clear(); + } + + if (!must_use_process && outbuff.empty() && errbuff.empty()) { + // We do not need to construct a background process. + // TODO: factor this job-status-setting stuff into a single place. p->completed = 1; if (p->is_last_in_job) { debug(4, L"Set status of job %d (%ls) to %d using short circuit", j->job_id, @@ -661,23 +660,13 @@ static bool handle_builtin_output(const std::shared_ptr &j, process_t *p, int status = p->status; proc_set_last_status(j->get_flag(job_flag_t::NEGATE) ? (!status) : status); } + return true; } else { - // Ok, unfortunately, we have to do a real fork. Bummer. We work hard to make - // sure we don't have to wait for all our threads to exit, by arranging things - // so that we don't have to allocate memory or do anything except system calls - // in the child. - // - // These strings may contain embedded nulls, so don't treat them as C strings. - std::string outbuff = wcs2string(stdout_stream.contents()); - std::string errbuff = wcs2string(stderr_stream.contents()); - + // Construct and run our background process. fflush(stdout); fflush(stderr); - if (!run_internal_process(p, std::move(outbuff), std::move(errbuff), *io_chain)) { - return false; - } + return run_internal_process(p, std::move(outbuff), std::move(errbuff), *io_chain); } - return true; } /// Executes an external command. diff --git a/src/io.cpp b/src/io.cpp index 67a9c03d5..5dcff30de 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -31,6 +31,7 @@ void io_pipe_t::print() const { void io_bufferfill_t::print() const { fwprintf(stderr, L"bufferfill {%d}\n", write_fd_.fd()); } void io_buffer_t::append_from_stream(const output_stream_t &stream) { + if (stream.empty()) return; scoped_lock locker(append_lock_); if (buffer_.discarded()) return; if (stream.buffer().discarded()) { diff --git a/src/io.h b/src/io.h index 026ce8b36..6f5e12447 100644 --- a/src/io.h +++ b/src/io.h @@ -410,28 +410,25 @@ struct io_streams_t { output_stream_t err; // fd representing stdin. This is not closed by the destructor. - int stdin_fd; + int stdin_fd{-1}; // Whether stdin is "directly redirected," meaning it is the recipient of a pipe (foo | cmd) or // direct redirection (cmd < foo.txt). An "indirect redirection" would be e.g. begin ; cmd ; end // < foo.txt - bool stdin_is_directly_redirected; + bool stdin_is_directly_redirected{false}; // Indicates whether stdout and stderr are redirected (e.g. to a file or piped). - bool out_is_redirected; - bool err_is_redirected; + bool out_is_redirected{false}; + bool err_is_redirected{false}; // Actual IO redirections. This is only used by the source builtin. Unowned. - const io_chain_t *io_chain; + const io_chain_t *io_chain{nullptr}; - io_streams_t(size_t read_limit) - : out(read_limit), - err(read_limit), - stdin_fd(-1), - stdin_is_directly_redirected(false), - out_is_redirected(false), - err_is_redirected(false), - io_chain(NULL) {} + // io_streams_t cannot be copied. + io_streams_t(const io_streams_t &) = delete; + void operator=(const io_streams_t &) = delete; + + explicit io_streams_t(size_t read_limit) : out(read_limit), err(read_limit), stdin_fd(-1) {} }; #if 0 diff --git a/src/postfork.cpp b/src/postfork.cpp index 77d7e04f3..e24b3fd62 100644 --- a/src/postfork.cpp +++ b/src/postfork.cpp @@ -379,27 +379,3 @@ void safe_report_exec_error(int err, const char *actual_cmd, const char *const * } } } - -/// Perform output from builtins. May be called from a forked child, so don't do anything that may -/// allocate memory, etc. -bool do_builtin_io(const char *out, size_t outlen, const char *err, size_t errlen) { - int saved_errno = 0; - bool success = true; - if (out && outlen && write_loop(STDOUT_FILENO, out, outlen) < 0) { - saved_errno = errno; - if (errno != EPIPE) { - debug_safe(0, "Error while writing to stdout"); - errno = saved_errno; - safe_perror("write_loop"); - } - success = false; - } - - if (err && errlen && write_loop(STDERR_FILENO, err, errlen) < 0) { - saved_errno = errno; - success = false; - } - - errno = saved_errno; - return success; -} diff --git a/src/postfork.h b/src/postfork.h index 27c9d57bb..f09efd8af 100644 --- a/src/postfork.h +++ b/src/postfork.h @@ -41,9 +41,6 @@ int setup_child_process(process_t *p, const dup2_list_t &dup2s); /// wait for threads to die. pid_t execute_fork(bool wait_for_threads_to_die); -/// Perform output from builtins. Returns true on success. -bool do_builtin_io(const char *out, size_t outlen, const char *err, size_t errlen); - /// Report an error from failing to exec or posix_spawn a command. void safe_report_exec_error(int err, const char *actual_cmd, const char *const *argv, const char *const *envv);