diff --git a/CMakeLists.txt b/CMakeLists.txt index 32dee2061..eb7949f7e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -80,7 +80,7 @@ SET(FISH_SRCS src/postfork.cpp src/proc.cpp src/reader.cpp src/sanity.cpp src/screen.cpp src/signal.cpp src/tinyexpr.cpp src/tnode.cpp src/tokenizer.cpp src/utf8.cpp src/util.cpp src/wcstringutil.cpp src/wgetopt.cpp src/wildcard.cpp src/wutil.cpp - src/future_feature_flags.cpp src/redirection.cpp + src/future_feature_flags.cpp src/redirection.cpp src/topic_monitor.cpp ) # Header files are just globbed. diff --git a/Makefile.in b/Makefile.in index eba0b41be..beb3b9253 100644 --- a/Makefile.in +++ b/Makefile.in @@ -119,7 +119,7 @@ FISH_OBJS := obj/autoload.o obj/builtin.o obj/builtin_bg.o obj/builtin_bind.o ob obj/parser_keywords.o obj/path.o obj/postfork.o obj/proc.o obj/reader.o \ obj/sanity.o obj/screen.o obj/signal.o obj/tinyexpr.o obj/tokenizer.o obj/tnode.o obj/utf8.o \ obj/util.o obj/wcstringutil.o obj/wgetopt.o obj/wildcard.o obj/wutil.o \ - obj/future_feature_flags.o obj/redirection.o + obj/future_feature_flags.o obj/redirection.o obj/topic_monitor.o FISH_INDENT_OBJS := obj/fish_indent.o obj/print_help.o $(FISH_OBJS) diff --git a/src/common.h b/src/common.h index 1433ad151..b74d6d98d 100644 --- a/src/common.h +++ b/src/common.h @@ -307,6 +307,12 @@ void vec_append(std::vector &receiver, std::vector &&donator) { std::make_move_iterator(donator.end())); } +/// Move an object into a shared_ptr. +template +std::shared_ptr move_to_sharedptr(T &&v) { + return std::make_shared(std::move(v)); +} + /// Print a stack trace to stderr. void show_stackframe(const wchar_t msg_level, int frame_count = 100, int skip_levels = 0); diff --git a/src/enum_set.h b/src/enum_set.h index 6ef6e2c8c..cbec399d0 100644 --- a/src/enum_set.h +++ b/src/enum_set.h @@ -1,26 +1,111 @@ #pragma once #include +#include +#include +/// A type (to specialize) that provides a count for an enum. +/// Example: +/// template<> struct enum_info_t +/// { static constexpr auto count = MyEnum::COUNT; }; template -class enum_set_t { +struct enum_info_t {}; + +/// \return the count of an enum. +template +constexpr size_t enum_count() { + return static_cast(enum_info_t::count); +} + +/// A bit set indexed by an enum type. +template +class enum_set_t : private std::bitset()> { private: + using super = std::bitset()>; + static size_t index_of(T t) { return static_cast(t); } + + explicit enum_set_t(unsigned long raw) : super(raw) {} + + public: + enum_set_t() = default; + + /*implicit*/ enum_set_t(T v) { set(v); } + + /*implicit*/ enum_set_t(std::initializer_list vs) { + for (T v : vs) set(v); + } + + static enum_set_t from_raw(unsigned long v) { return enum_set_t{v}; } + + unsigned long to_raw() const { return super::to_ulong(); } + + bool get(T t) const { return super::test(index_of(t)); } + + void set(T t, bool v = true) { super::set(index_of(t), v); } + + void clear(T t) { super::reset(index_of(t)); } + + bool none() const { return super::none(); } + + bool any() const { return super::any(); } + + bool operator==(const enum_set_t &rhs) const { return super::operator==(rhs); } + + bool operator!=(const enum_set_t &rhs) const { return super::operator!=(rhs); } +}; + +/// An array of Elem indexed by an enum class. +template +class enum_array_t : public std::array()> { + using super = std::array()>; using base_type_t = typename std::underlying_type::type; - std::bitset<8 * sizeof(base_type_t)> bitmask{0}; + static int index_of(T t) { return static_cast(t); } public: - bool get(T t) const { return bitmask.test(index_of(t)); } - - void set(T t, bool v) { - if (v) { - bitmask.set(index_of(t)); - } else { - bitmask.reset(index_of(t)); - } - } - - void set(T t) { bitmask.set(index_of(t)); } - - void clear(T t) { bitmask.reset(index_of(t)); } + Elem &at(T t) { return super::at(index_of(t)); } + const Elem &at(T t) const { return super::at(index_of(t)); } + Elem &operator[](T t) { return super::operator[](index_of(t)); } + const Elem &operator[](T t) const { return super::operator[](index_of(t)); } +}; + +/// A counting iterator for an enum class. +/// This enumerates the values of an enum class from 0 up to (not including) count. +/// Example: +/// for (auto v : enum_iter_t) {...} +template +class enum_iter_t { + using base_type_t = typename std::underlying_type::type; + struct iterator_t { + friend class enum_iter_t; + using iterator_category = std::forward_iterator_tag; + using value_type = T; + using difference_type = long; + + explicit iterator_t(base_type_t v) : v_(v) {} + + T operator*() const { return static_cast(v_); } + const T *operator->() const { return static_cast(v_); } + + iterator_t &operator++() { + v_ += 1; + return *this; + } + + iterator_t operator++(int) { + auto res = *this; + v_ += 1; + return res; + } + + bool operator==(iterator_t rhs) const { return v_ == rhs.v_; } + bool operator!=(iterator_t rhs) const { return v_ != rhs.v_; } + + private: + base_type_t v_{}; + }; + + public: + iterator_t begin() const { return iterator_t{0}; } + iterator_t end() const { return iterator_t{static_cast(enum_count())}; } }; diff --git a/src/exec.cpp b/src/exec.cpp index bf311bb74..9b29ed8f1 100644 --- a/src/exec.cpp +++ b/src/exec.cpp @@ -34,6 +34,7 @@ #include "fallback.h" // IWYU pragma: keep #include "function.h" #include "io.h" +#include "iothread.h" #include "parse_tree.h" #include "parser.h" #include "postfork.h" @@ -55,16 +56,6 @@ /// Base open mode to pass to calls to open. #define OPEN_MASK 0666 -/// Called in a forked child. -static void exec_write_and_exit(int fd, const char *buff, size_t count, int status) { - if (write_loop(fd, buff, count) == -1) { - debug(0, WRITE_ERROR); - wperror(L"write"); - exit_without_destructors(status); - } - exit_without_destructors(status); -} - void exec_close(int fd) { ASSERT_IS_MAIN_THREAD(); @@ -82,12 +73,11 @@ void exec_close(int fd) { } /// Returns true if the redirection is a file redirection to a file other than /dev/null. -static bool redirection_is_to_real_file(const io_data_t *io) { +static bool redirection_is_to_real_file(const shared_ptr &io) { bool result = false; - if (io != NULL && io->io_mode == io_mode_t::file) { + if (io && io->io_mode == io_mode_t::file) { // It's a file redirection. Compare the path to /dev/null. - const io_file_t *io_file = static_cast(io); - const char *path = io_file->filename_cstr; + const char *path = static_cast(io.get())->filename_cstr; if (strcmp(path, "/dev/null") != 0) { // It's not /dev/null. result = true; @@ -361,6 +351,98 @@ static void on_process_created(const std::shared_ptr &j, pid_t child_pid) } } +/// Construct an internal process for the process p. In the background, write the data \p outdata to +/// stdout and \p errdata to stderr, respecting the io chain \p ios. For example if target_fd is 1 +/// (stdout), and there is a dup2 3->1, then we need to write to fd 3. Then exit the internal +/// process. +static bool run_internal_process(process_t *p, std::string outdata, std::string errdata, + io_chain_t ios) { + p->check_generations_before_launch(); + + // We want both the dup2s and the io_chain_ts to be kept alive by the background thread, because + // they may own an fd that we want to write to. Move them all to a shared_ptr. The strings as + // well (they may be long). + // Construct a little helper struct to make it simpler to move into our closure without copying. + struct write_fields_t { + int src_outfd{-1}; + std::string outdata{}; + + int src_errfd{-1}; + std::string errdata{}; + + io_chain_t ios{}; + maybe_t dup2s{}; + std::shared_ptr internal_proc{}; + + int success_status{}; + + bool skip_out() const { return outdata.empty() || src_outfd < 0; } + + bool skip_err() const { return errdata.empty() || src_errfd < 0; } + }; + + auto f = std::make_shared(); + f->outdata = std::move(outdata); + f->errdata = std::move(errdata); + + // Construct and assign the internal process to the real process. + p->internal_proc_ = std::make_shared(); + f->internal_proc = p->internal_proc_; + + // Resolve the IO chain. + // Note it's important we do this even if we have no out or err data, because we may have been + // asked to truncate a file (e.g. `echo -n '' > /tmp/truncateme.txt'). The open() in the dup2 + // list resolution will ensure this happens. + f->dup2s = dup2_list_t::resolve_chain(ios); + if (!f->dup2s) { + return false; + } + + // Figure out which source fds to write to. If they are closed (unlikely) we just exit + // successfully. + f->src_outfd = f->dup2s->fd_for_target_fd(STDOUT_FILENO); + f->src_errfd = f->dup2s->fd_for_target_fd(STDERR_FILENO); + + // If we have nothing to right we can elide the thread. + // TODO: support eliding output to /dev/null. + if (f->skip_out() && f->skip_err()) { + f->internal_proc->mark_exited(EXIT_SUCCESS); + return true; + } + + // Ensure that ios stays alive, it may own fds. + f->ios = ios; + + // If our process is a builtin, it will have already set its status value. Make sure we + // propagate that if our I/O succeeds and don't read it on a background thread. TODO: have + // builtin_run provide this directly, rather than setting it in the process. + f->success_status = p->status; + + iothread_perform([f]() { + int status = f->success_status; + if (!f->skip_out()) { + ssize_t ret = write_loop(f->src_outfd, f->outdata.data(), f->outdata.size()); + if (ret < 0) { + if (errno != EPIPE) { + wperror(L"write"); + } + if (!status) status = 1; + } + } + if (!f->skip_err()) { + ssize_t ret = write_loop(f->src_errfd, f->errdata.data(), f->errdata.size()); + if (ret < 0) { + if (errno != EPIPE) { + wperror(L"write"); + } + if (!status) status = 1; + } + } + f->internal_proc->mark_exited(status); + }); + return true; +} + /// Call fork() as part of executing a process \p p in a job \j. Execute \p child_action in the /// context of the child. Returns true if fork succeeded, false if fork failed. static bool fork_child_for_process(const std::shared_ptr &job, process_t *p, @@ -505,71 +587,71 @@ static bool exec_internal_builtin_proc(parser_t &parser, const std::shared_ptr &j, process_t *p, io_chain_t *io_chain, const io_streams_t &builtin_io_streams) { assert(p->type == INTERNAL_BUILTIN && "Process is not a builtin"); - // Handle output from builtin commands. In the general case, this means forking of a - // worker process, that will write out the contents of the stdout and stderr buffers - // to the correct file descriptor. Since forking is expensive, fish tries to avoid - // it when possible. - bool fork_was_skipped = false; - - const shared_ptr stdout_io = io_chain->get_io_for_fd(STDOUT_FILENO); - const shared_ptr stderr_io = io_chain->get_io_for_fd(STDERR_FILENO); const output_stream_t &stdout_stream = builtin_io_streams.out; const output_stream_t &stderr_stream = builtin_io_streams.err; - // If we are outputting to a file, we have to actually do it, even if we have no - // output, so that we can truncate the file. Does not apply to /dev/null. - bool must_fork = redirection_is_to_real_file(stdout_io.get()) || - redirection_is_to_real_file(stderr_io.get()); - if (!must_fork && p->is_last_in_job) { - // We are handling reads directly in the main loop. Note that we may still end - // up forking. - const bool stdout_is_bufferfill = - (stdout_io && stdout_io->io_mode == io_mode_t::bufferfill); - const std::shared_ptr stdout_buffer = - stdout_is_bufferfill ? static_cast(stdout_io.get())->buffer() - : nullptr; - const bool no_stdout_output = stdout_stream.empty(); - const bool no_stderr_output = stderr_stream.empty(); - const bool stdout_discarded = stdout_stream.buffer().discarded(); + // Mark if we discarded output. + if (stdout_stream.buffer().discarded()) p->status = STATUS_READ_TOO_MUCH; - if (!stdout_discarded && no_stdout_output && no_stderr_output) { - // The builtin produced no output and is not inside of a pipeline. No - // need to fork or even output anything. - debug(4, L"Skipping fork: no output for internal builtin '%ls'", p->argv0()); - fork_was_skipped = true; - } else if (no_stderr_output && stdout_buffer) { - // The builtin produced no stderr, and its stdout is going to an - // internal buffer. There is no need to fork. This helps out the - // performance quite a bit in complex completion code. - // TODO: we're sloppy about handling explicitly separated output. - // Theoretically we could have explicitly separated output on stdout and - // also stderr output; in that case we ought to thread the exp-sep output - // through to the io buffer. We're getting away with this because the only - // thing that can output exp-sep output is `string split0` which doesn't - // also produce stderr. - debug(4, L"Skipping fork: buffered output for internal builtin '%ls'", p->argv0()); + // We will try to elide constructing an internal process. However if the output is going to a + // real file, we have to do it. For example in `echo -n > file.txt` we proceed to open file.txt + // even though there is no output, so that it is properly truncated. + const shared_ptr stdout_io = io_chain->get_io_for_fd(STDOUT_FILENO); + const shared_ptr stderr_io = io_chain->get_io_for_fd(STDERR_FILENO); + bool must_use_process = + redirection_is_to_real_file(stdout_io) || redirection_is_to_real_file(stderr_io); - stdout_buffer->append_from_stream(stdout_stream); - fork_was_skipped = true; - } else if (stdout_io.get() == NULL && stderr_io.get() == NULL) { - // We are writing to normal stdout and stderr. Just do it - no need to fork. - debug(4, L"Skipping fork: ordinary output for internal builtin '%ls'", p->argv0()); - const std::string outbuff = wcs2string(stdout_stream.contents()); - const std::string errbuff = wcs2string(stderr_stream.contents()); - bool builtin_io_done = - do_builtin_io(outbuff.data(), outbuff.size(), errbuff.data(), errbuff.size()); - if (!builtin_io_done && errno != EPIPE) { - redirect_tty_output(); // workaround glibc bug - debug(0, "!builtin_io_done and errno != EPIPE"); - show_stackframe(L'E'); - } - if (stdout_discarded) p->status = STATUS_READ_TOO_MUCH; - fork_was_skipped = true; - } + // If we are directing output to a buffer, then we can just transfer it directly without needing + // to write to the bufferfill pipe. Note this is how we handle explicitly separated stdout + // output (i.e. string split0) which can't really be sent through a pipe. + // TODO: we're sloppy about handling explicitly separated output. + // Theoretically we could have explicitly separated output on stdout and also stderr output; in + // that case we ought to thread the exp-sep output through to the io buffer. We're getting away + // with this because the only thing that can output exp-sep output is `string split0` which + // doesn't also produce stderr. Also note that we never send stderr to a buffer, so there's no + // need for a similar check for stderr. + bool stdout_done = false; + if (stdout_io && stdout_io->io_mode == io_mode_t::bufferfill) { + auto stdout_buffer = static_cast(stdout_io.get())->buffer(); + stdout_buffer->append_from_stream(stdout_stream); + stdout_done = true; } - if (fork_was_skipped) { + // Figure out any data remaining to write. We may have none in which case we can short-circuit. + std::string outbuff = stdout_done ? std::string{} : wcs2string(stdout_stream.contents()); + std::string errbuff = wcs2string(stderr_stream.contents()); + + // If we have no redirections for stdout/stderr, just write them directly. + if (!stdout_io && !stderr_io) { + bool did_err = false; + if (write_loop(STDOUT_FILENO, outbuff.data(), outbuff.size()) < 0) { + if (errno != EPIPE) { + did_err = true; + debug(0, "Error while writing to stdout"); + wperror(L"write_loop"); + } + } + if (write_loop(STDERR_FILENO, errbuff.data(), errbuff.size()) < 0) { + if (errno != EPIPE && !did_err) { + did_err = true; + debug(0, "Error while writing to stderr"); + wperror(L"write_loop"); + } + } + if (did_err) { + redirect_tty_output(); // workaround glibc bug + debug(0, "!builtin_io_done and errno != EPIPE"); + show_stackframe(L'E'); + } + // Clear the buffers to indicate we finished. + outbuff.clear(); + errbuff.clear(); + } + + if (!must_use_process && outbuff.empty() && errbuff.empty()) { + // We do not need to construct a background process. + // TODO: factor this job-status-setting stuff into a single place. p->completed = 1; if (p->is_last_in_job) { debug(4, L"Set status of job %d (%ls) to %d using short circuit", j->job_id, @@ -578,37 +660,13 @@ static bool handle_builtin_output(const std::shared_ptr &j, process_t *p, int status = p->status; proc_set_last_status(j->get_flag(job_flag_t::NEGATE) ? (!status) : status); } + return true; } else { - // Ok, unfortunately, we have to do a real fork. Bummer. We work hard to make - // sure we don't have to wait for all our threads to exit, by arranging things - // so that we don't have to allocate memory or do anything except system calls - // in the child. - // - // These strings may contain embedded nulls, so don't treat them as C strings. - const std::string outbuff_str = wcs2string(stdout_stream.contents()); - const char *outbuff = outbuff_str.data(); - size_t outbuff_len = outbuff_str.size(); - - const std::string errbuff_str = wcs2string(stderr_stream.contents()); - const char *errbuff = errbuff_str.data(); - size_t errbuff_len = errbuff_str.size(); - - // Resolve our IO chain to a sequence of dup2s. - auto dup2s = dup2_list_t::resolve_chain(*io_chain); - if (!dup2s) { - return false; - } - + // Construct and run our background process. fflush(stdout); fflush(stderr); - if (!fork_child_for_process(j, p, *dup2s, false, "internal builtin", [&] { - do_builtin_io(outbuff, outbuff_len, errbuff, errbuff_len); - exit_without_destructors(p->status); - })) { - return false; - } + return run_internal_process(p, std::move(outbuff), std::move(errbuff), *io_chain); } - return true; } /// Executes an external command. @@ -784,24 +842,9 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr io_chain.remove(block_output_bufferfill); auto block_output_buffer = io_bufferfill_t::finish(std::move(block_output_bufferfill)); - // Resolve our IO chain to a sequence of dup2s. - auto dup2s = dup2_list_t::resolve_chain(io_chain); - if (!dup2s) { - return false; - } - - const std::string buffer_contents = block_output_buffer->buffer().newline_serialized(); - const char *buffer = buffer_contents.data(); - size_t count = buffer_contents.size(); - if (count > 0) { - // We don't have to drain threads here because our child process is simple. - const char *fork_reason = - p->type == INTERNAL_BLOCK_NODE ? "internal block io" : "internal function io"; - if (!fork_child_for_process(j, p, *dup2s, false, fork_reason, [&] { - exec_write_and_exit(STDOUT_FILENO, buffer, count, status); - })) { - return false; - } + std::string buffer_contents = block_output_buffer->buffer().newline_serialized(); + if (!buffer_contents.empty()) { + return run_internal_process(p, std::move(buffer_contents), {} /*errdata*/, io_chain); } else { if (p->is_last_in_job) { proc_set_last_status(j->get_flag(job_flag_t::NEGATE) ? (!status) : status); @@ -899,6 +942,7 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr< } // Execute the process. + p->check_generations_before_launch(); switch (p->type) { case INTERNAL_FUNCTION: case INTERNAL_BLOCK_NODE: { diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index 7b1f5c6c3..351e49a77 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -69,6 +70,7 @@ #include "signal.h" #include "tnode.h" #include "tokenizer.h" +#include "topic_monitor.h" #include "utf8.h" #include "util.h" #include "wcstringutil.h" @@ -286,6 +288,48 @@ static void test_str_to_num() { L"converting invalid num to long did not fail"); } +enum class test_enum { alpha, beta, gamma, COUNT }; + +template <> +struct enum_info_t { + static constexpr auto count = test_enum::COUNT; +}; + +static void test_enum_set() { + say(L"Testing enum set"); + enum_set_t es; + do_test(es.none()); + do_test(!es.any()); + do_test(es.to_raw() == 0); + do_test(es == enum_set_t::from_raw(0)); + do_test(es != enum_set_t::from_raw(1)); + + es.set(test_enum::beta); + do_test(es.to_raw() == 2); + do_test(es == enum_set_t::from_raw(2)); + do_test(es == enum_set_t{test_enum::beta}); + do_test(es != enum_set_t::from_raw(3)); + do_test(es.any()); + do_test(!es.none()); + + unsigned idx = 0; + for (auto v : enum_iter_t{}) { + do_test(static_cast(v) == idx); + idx++; + } + do_test(static_cast(test_enum::COUNT) == idx); +} + +static void test_enum_array() { + say(L"Testing enum array"); + enum_array_t es{}; + do_test(es.size() == enum_count()); + es[test_enum::beta] = "abc"; + do_test(es[test_enum::beta] == "abc"); + es.at(test_enum::gamma) = "def"; + do_test(es.at(test_enum::gamma) == "def"); +} + /// Test sane escapes. static void test_unescape_sane() { const struct test_t { @@ -2378,6 +2422,30 @@ static void test_dup2s() { do_test(!list.has_value()); } +static void test_dup2s_fd_for_target_fd() { + using std::make_shared; + io_chain_t chain; + // note io_fd_t params are backwards from dup2. + chain.push_back(make_shared(10)); + chain.push_back(make_shared(9, 10, true)); + chain.push_back(make_shared(5, 8, true)); + chain.push_back(make_shared(1, 4, true)); + chain.push_back(make_shared(3, 5, true)); + auto list = dup2_list_t::resolve_chain(chain); + + do_test(list.has_value()); + do_test(list->fd_for_target_fd(3) == 8); + do_test(list->fd_for_target_fd(5) == 8); + do_test(list->fd_for_target_fd(8) == 8); + do_test(list->fd_for_target_fd(1) == 4); + do_test(list->fd_for_target_fd(4) == 4); + do_test(list->fd_for_target_fd(100) == 100); + do_test(list->fd_for_target_fd(0) == 0); + do_test(list->fd_for_target_fd(-1) == -1); + do_test(list->fd_for_target_fd(9) == -1); + do_test(list->fd_for_target_fd(10) == -1); +} + /// Testing colors. static void test_colors() { say(L"Testing colors"); @@ -5031,6 +5099,68 @@ void test_normalize_path() { do_test(path_normalize_for_cd(L"/abc/def/", L"../ghi/..") == L"/abc/ghi/.."); } +static void test_topic_monitor() { + say(L"Testing topic monitor"); + topic_monitor_t monitor; + generation_list_t gens{}; + constexpr auto t = topic_t::sigchld; + do_test(gens[t] == 0); + do_test(monitor.generation_for_topic(t) == 0); + auto changed = monitor.check(&gens, topic_set_t{t}, false /* wait */); + do_test(changed.none()); + do_test(gens[t] == 0); + + monitor.post(t); + changed = monitor.check(&gens, topic_set_t{t}, true /* wait */); + do_test(changed == topic_set_t{t}); + do_test(gens[t] == 1); + do_test(monitor.generation_for_topic(t) == 1); + + monitor.post(t); + do_test(monitor.generation_for_topic(t) == 2); + changed = monitor.check(&gens, topic_set_t{t}, true /* wait */); + do_test(changed == topic_set_t{t}); +} + +static void test_topic_monitor_torture() { + say(L"Torture-testing topic monitor"); + topic_monitor_t monitor; + const size_t thread_count = 64; + constexpr auto t1 = topic_t::sigchld; + constexpr auto t2 = topic_t::sighupint; + std::vector gens; + gens.resize(thread_count, generation_list_t{}); + std::atomic post_count{}; + for (auto &gen : gens) { + gen = monitor.current_generations(); + post_count += 1; + monitor.post(t1); + } + + std::atomic completed{}; + std::vector threads; + for (size_t i = 0; i < thread_count; i++) { + threads.emplace_back( + [&](size_t i) { + for (size_t j = 0; j < (1 << 11); j++) { + auto before = gens[i]; + auto changed = monitor.check(&gens[i], topic_set_t{t1, t2}, true /* wait */); + do_test(before[t1] < gens[i][t1]); + do_test(gens[i][t1] <= post_count); + do_test(gens[i][t2] == 0); + } + auto amt = completed.fetch_add(1, std::memory_order_relaxed); + }, + i); + } + + while (completed.load(std::memory_order_relaxed) < thread_count) { + post_count += 1; + monitor.post(t1); + } + for (auto &t : threads) t.join(); +} + /// Main test. int main(int argc, char **argv) { UNUSED(argc); @@ -5082,6 +5212,8 @@ int main(int argc, char **argv) { if (should_test_function("wcstring_tok")) test_wcstring_tok(); if (should_test_function("env_vars")) test_env_vars(); if (should_test_function("str_to_num")) test_str_to_num(); + if (should_test_function("enum")) test_enum_set(); + if (should_test_function("enum")) test_enum_array(); if (should_test_function("highlighting")) test_highlighting(); if (should_test_function("new_parser_ll2")) test_new_parser_ll2(); if (should_test_function("new_parser_fuzzing")) @@ -5115,6 +5247,7 @@ int main(int argc, char **argv) { if (should_test_function("test")) test_test(); if (should_test_function("wcstod")) test_wcstod(); if (should_test_function("dup2s")) test_dup2s(); + if (should_test_function("dup2s")) test_dup2s_fd_for_target_fd(); if (should_test_function("path")) test_path(); if (should_test_function("pager_navigation")) test_pager_navigation(); if (should_test_function("pager_layout")) test_pager_layout(); @@ -5148,6 +5281,8 @@ int main(int argc, char **argv) { if (should_test_function("maybe")) test_maybe(); if (should_test_function("layout_cache")) test_layout_cache(); if (should_test_function("normalize")) test_normalize_path(); + if (should_test_function("topics")) test_topic_monitor(); + if (should_test_function("topics")) test_topic_monitor_torture(); // history_tests_t::test_history_speed(); say(L"Encountered %d errors in low-level tests", err_count); diff --git a/src/io.cpp b/src/io.cpp index 6f72c3eb0..5dcff30de 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -13,6 +13,7 @@ #include "fallback.h" // IWYU pragma: keep #include "io.h" #include "iothread.h" +#include "redirection.h" #include "wutil.h" // IWYU pragma: keep io_data_t::~io_data_t() = default; @@ -30,6 +31,7 @@ void io_pipe_t::print() const { void io_bufferfill_t::print() const { fwprintf(stderr, L"bufferfill {%d}\n", write_fd_.fd()); } void io_buffer_t::append_from_stream(const output_stream_t &stream) { + if (stream.empty()) return; scoped_lock locker(append_lock_); if (buffer_.discarded()) return; if (stream.buffer().discarded()) { @@ -122,7 +124,7 @@ void io_buffer_t::begin_background_fillthread(autoclose_fd_t fd) { // We want our background thread to own the fd but it's not easy to move into a std::function. // Use a shared_ptr. - auto fdref = std::make_shared(std::move(fd)); + auto fdref = move_to_sharedptr(std::move(fd)); // Our function to read until the receiver is closed. // It's OK to capture 'this' by value because 'this' owns the background thread and joins it diff --git a/src/io.h b/src/io.h index 026ce8b36..6f5e12447 100644 --- a/src/io.h +++ b/src/io.h @@ -410,28 +410,25 @@ struct io_streams_t { output_stream_t err; // fd representing stdin. This is not closed by the destructor. - int stdin_fd; + int stdin_fd{-1}; // Whether stdin is "directly redirected," meaning it is the recipient of a pipe (foo | cmd) or // direct redirection (cmd < foo.txt). An "indirect redirection" would be e.g. begin ; cmd ; end // < foo.txt - bool stdin_is_directly_redirected; + bool stdin_is_directly_redirected{false}; // Indicates whether stdout and stderr are redirected (e.g. to a file or piped). - bool out_is_redirected; - bool err_is_redirected; + bool out_is_redirected{false}; + bool err_is_redirected{false}; // Actual IO redirections. This is only used by the source builtin. Unowned. - const io_chain_t *io_chain; + const io_chain_t *io_chain{nullptr}; - io_streams_t(size_t read_limit) - : out(read_limit), - err(read_limit), - stdin_fd(-1), - stdin_is_directly_redirected(false), - out_is_redirected(false), - err_is_redirected(false), - io_chain(NULL) {} + // io_streams_t cannot be copied. + io_streams_t(const io_streams_t &) = delete; + void operator=(const io_streams_t &) = delete; + + explicit io_streams_t(size_t read_limit) : out(read_limit), err(read_limit), stdin_fd(-1) {} }; #if 0 diff --git a/src/iothread.h b/src/iothread.h index 5e677b639..39d99e539 100644 --- a/src/iothread.h +++ b/src/iothread.h @@ -2,6 +2,7 @@ #ifndef FISH_IOTHREAD_H #define FISH_IOTHREAD_H +#include #include #include diff --git a/src/postfork.cpp b/src/postfork.cpp index 77d7e04f3..e24b3fd62 100644 --- a/src/postfork.cpp +++ b/src/postfork.cpp @@ -379,27 +379,3 @@ void safe_report_exec_error(int err, const char *actual_cmd, const char *const * } } } - -/// Perform output from builtins. May be called from a forked child, so don't do anything that may -/// allocate memory, etc. -bool do_builtin_io(const char *out, size_t outlen, const char *err, size_t errlen) { - int saved_errno = 0; - bool success = true; - if (out && outlen && write_loop(STDOUT_FILENO, out, outlen) < 0) { - saved_errno = errno; - if (errno != EPIPE) { - debug_safe(0, "Error while writing to stdout"); - errno = saved_errno; - safe_perror("write_loop"); - } - success = false; - } - - if (err && errlen && write_loop(STDERR_FILENO, err, errlen) < 0) { - saved_errno = errno; - success = false; - } - - errno = saved_errno; - return success; -} diff --git a/src/postfork.h b/src/postfork.h index 27c9d57bb..f09efd8af 100644 --- a/src/postfork.h +++ b/src/postfork.h @@ -41,9 +41,6 @@ int setup_child_process(process_t *p, const dup2_list_t &dup2s); /// wait for threads to die. pid_t execute_fork(bool wait_for_threads_to_die); -/// Perform output from builtins. Returns true on success. -bool do_builtin_io(const char *out, size_t outlen, const char *err, size_t errlen); - /// Report an error from failing to exec or posix_spawn a command. void safe_report_exec_error(int err, const char *actual_cmd, const char *const *argv, const char *const *envv); diff --git a/src/proc.cpp b/src/proc.cpp index c52cf09dd..e0bfff3e9 100644 --- a/src/proc.cpp +++ b/src/proc.cpp @@ -247,6 +247,13 @@ bool job_t::signal(int signal) { return true; } +void internal_proc_t::mark_exited(int status) { + assert(!exited() && "Process is already exited"); + exited_.store(true, std::memory_order_relaxed); + status_.store(status, std::memory_order_release); + topic_monitor_t::principal().post(topic_t::internal_exit); +} + static void mark_job_complete(const job_t *j) { for (auto &p : j->processes) { p->completed = 1; @@ -324,7 +331,11 @@ static void handle_child_status(pid_t pid, int status) { } } -process_t::process_t() {} +process_t::process_t() = default; + +void process_t::check_generations_before_launch() { + gens_ = topic_monitor_t::principal().current_generations(); +} job_t::job_t(job_id_t jobid, io_chain_t bio, std::shared_ptr parent) : block_io(std::move(bio)), @@ -361,194 +372,74 @@ void add_disowned_pgid(pid_t pgid) { } } -/// A static value tracking how many SIGCHLDs we have seen, which is used in a heurstic to -/// determine if we should call waitpid() at all in `process_mark_finished_children`. -static volatile process_generation_count_t s_sigchld_generation_cnt = 0; - -/// See if any children of a fully constructed job have exited or been killed, and mark them -/// accordingly. We cannot reap just any child that's exited, (as in, `waitpid(-1,…`) since -/// that may reap a pgrp leader that has exited but in a job with another process that has yet to -/// launch and join its pgrp (#5219). -/// \param block_on_fg when true, blocks waiting for the foreground job to finish. -/// \return whether the operation completed without incident -static bool process_mark_finished_children(bool block_on_fg) { +/// See if any reapable processes have exited, and mark them accordingly. +/// \param block_ok if no reapable processes have exited, block until one is (or until we receive a +/// signal). +static void process_mark_finished_children(bool block_ok) { ASSERT_IS_MAIN_THREAD(); - // We can't always use SIGCHLD to determine if waitpid() should be called since it is not - // strictly one-SIGCHLD-per-one-child-exited (i.e. multiple exits can share a SIGCHLD call) and - // we a) return immediately the first time a dead child is reaped, b) explicitly skip over jobs - // that aren't yet fully constructed, so it's possible that we can get SIGCHLD and even find a - // killed child in the jobs we are reaping, but also have an exited child process in a job that - // hasn't been fully constructed yet - which means we can end up never knowing about the exited - // child process in that job if we use SIGCHLD count as the only metric for whether or not - // waitpid() is called. - // Without this optimization, the slowdown caused by calling waitpid() even just once each time - // `process_mark_finished_children()` is called is rather obvious (see the performance-related - // discussion in #5219), making it worth the complexity of this heuristic. + // Get the exit and signal generations of all reapable processes. + // The exit generation tells us if we have an exit; the signal generation allows for detecting + // SIGHUP and SIGINT. + // Get the gen count of all reapable processes. + topic_set_t reaptopics{}; + generation_list_t gens{}; + gens.fill(invalid_generation); + job_iterator_t jobs; + while (auto *j = jobs.next()) { + for (const auto &proc : j->processes) { + if (auto mtopic = j->reap_topic_for_process(proc.get())) { + topic_t topic = *mtopic; + reaptopics.set(topic); + gens[topic] = std::min(gens[topic], proc->gens_[topic]); - /// Tracks whether or not we received SIGCHLD without checking all jobs (due to jobs under - /// construction), forcing a full waitpid loop. - static bool dirty_state = true; - static process_generation_count_t last_sigchld_count = -1; - - // If the last time that we received a SIGCHLD we did not waitpid all jobs, we cannot early out. - if (!dirty_state && last_sigchld_count == s_sigchld_generation_cnt) { - // If we have foreground jobs, we need to block on them below - if (!block_on_fg) { - // We can assume that no children have exited and that all waitpid calls with - // WNOHANG below will confirm that. - return true; + reaptopics.set(topic_t::sighupint); + gens[topic_t::sighupint] = + std::min(gens[topic_t::sighupint], proc->gens_[topic_t::sighupint]); + } } } - last_sigchld_count = s_sigchld_generation_cnt; - bool jobs_skipped = false; - bool has_error = false; - job_t *job_fg = nullptr; + if (reaptopics.none()) { + // No reapable processes, nothing to wait for. + return; + } - // Reap only processes belonging to fully-constructed jobs to prevent reaping of processes - // before others in the same process group have a chance to join their pgrp. - job_iterator_t jobs; - while (auto j = jobs.next()) { - // (A job can have pgrp INVALID_PID if it consists solely of builtins that perform no IO) - if (j->pgid == INVALID_PID || !j->is_constructed()) { - debug(5, "Skipping wait on incomplete job %d (%ls)", j->job_id, j->preview().c_str()); - jobs_skipped = true; - continue; - } + // Now check for changes, optionally waiting. + auto changed_topics = topic_monitor_t::principal().check(&gens, reaptopics, block_ok); + if (changed_topics.none()) return; - if (j != job_fg && j->is_foreground() && !j->is_stopped() && !j->is_completed()) { - // Ensure that we don't have multiple fully constructed foreground jobs. - assert((!job_fg || !job_fg->job_chain_is_fully_constructed() || - !j->job_chain_is_fully_constructed()) && - "More than one active, fully-constructed foreground job!"); - job_fg = j; - } + // We got some changes. Since we last checked we received SIGCHLD, and or HUP/INT. + // Update the hup/int generations and reap any reapable processes. + jobs.reset(); + while (auto *j = jobs.next()) { + for (const auto &proc : j->processes) { + if (auto mtopic = j->reap_topic_for_process(proc.get())) { + // Update the signal hup/int gen. + proc->gens_[topic_t::sighupint] = gens[topic_t::sighupint]; - // Whether we will wait for uncompleted processes depends on the combination of - // `block_on_fg` and the nature of the process. Default is WNOHANG, but if foreground, - // constructed, not stopped, *and* block_on_fg is true, then no WNOHANG (i.e. "HANG"). - int options = WUNTRACED | WNOHANG; - - // We should never block twice in the same go, as `waitpid()' returning could mean one - // process completed or many, and there is a race condition when calling `waitpid()` after - // the process group exits having reaped all children and terminated the process group and - // when a subsequent call to `waitpid()` for the same process group returns immediately if - // that process group no longer exists. i.e. it's possible for all processes to have exited - // but the process group to remain momentarily valid, in which case calling `waitpid()` - // without WNOHANG can cause an infinite wait. Additionally, only wait on external jobs that - // spawned new process groups (i.e. JOB_CONTROL). We do not break or return on error as we - // wait on only one pgrp at a time and we need to check all pgrps before returning, but we - // never wait/block on fg processes after an error has been encountered to give ourselves - // (elsewhere) a chance to handle the fallout from process termination, etc. - if (!has_error && block_on_fg && j == job_fg) { - debug(4, "Waiting on processes from foreground job %d", job_fg->pgid); - options &= ~WNOHANG; - } - - // Child jobs (produced via execution of functions) share job ids with their not-yet- - // fully-constructed parent jobs, so we have to wait on these by individual process id - // and not by the shared pgroup. End result is the same, but it just makes more calls - // to the kernel. - bool wait_by_process = !j->job_chain_is_fully_constructed(); - - // Firejail can result in jobs with pgroup 0, in which case we cannot wait by - // job id. See discussion in #5295. - if (j->pgid == 0) { - wait_by_process = true; - } - - // Cygwin does some voodoo with regards to process management that I do not understand, but - // long story short, we cannot reap processes by their pgroup. The way child processes are - // launched under Cygwin is... weird, and outwardly they do not appear to retain information - // about their parent process when viewed in Task Manager. Waiting on processes by their - // pgroup results in never reaping any, so we just wait on them by process id instead. - if (is_cygwin()) { - wait_by_process = true; - } - - // When waiting on processes individually in a pipeline, we need to enumerate in reverse - // order so that the first process we actually wait on (i.e. ~WNOHANG) is the last process - // in the IO chain, because that's the one that controls the lifetime of the foreground job - // - as long as it is still running, we are in the background and once it exits or is - // killed, all previous jobs in the IO pipeline must necessarily terminate as well. - auto process = j->processes.rbegin(); - // waitpid(2) returns 1 process each time, we need to keep calling it until we've reaped all - // children of the pgrp in question or else we can't reset the dirty_state flag. In all - // cases, calling waitpid(2) is faster than potentially calling select_try() on a process - // that has exited, which will force us to wait the full timeout before coming back here and - // calling waitpid() again. - while (true) { - int status; - pid_t pid; - - if (wait_by_process) { - // If the evaluation of a function resulted in the sharing of a pgroup between the - // real job and the job that shouldn't have been created as a separate job AND the - // parent job is still under construction (which is the case when continue_job() is - // first called on the child job during the recursive call to exec_job() before the - // parent job has been fully constructed), we need to call waitpid(2) on the - // individual processes of the child job instead of using a catch-all waitpid(2) - // call on the job's process group. - if (process == j->processes.rend()) { - break; - } - assert((*process)->pid != INVALID_PID && "Waiting by process on an invalid PID!"); - if ((*process)->completed) { - // This process has already been waited on to completion - process++; - continue; - } - - if ((options & WNOHANG) == 0) { - debug(4, "Waiting on individual process %d: %ls", (*process)->pid, (*process)->argv0()); - } else { - debug(4, "waitpid with WNOHANG on individual process %d", (*process)->pid); - } - pid = waitpid((*process)->pid, &status, options); - - process++; - } else { - // A negative PID passed in to `waitpid()` means wait on any child in that process - // group - pid = waitpid(-1 * j->pgid, &status, options); - } - - if (pid > 0) { - // A child process has been reaped - debug(4, "Reaped PID %d", pid); - handle_child_status(pid, status); - - // Always set WNOHANG (that is, don't hang). Otherwise we might wait on a non-stopped job - // that becomes stopped, but we don't refresh our view of the process state before - // calling waitpid(2) again here. - options |= WNOHANG; - } else if (pid == 0 || errno == ECHILD) { - // No killed/dead children in this particular process group - if (!wait_by_process) { - if ((options & WNOHANG) == 0) { - // This normally implies that the job has completed, but if we try to wait - // on a job that includes a process that changed its own group before we - // enter `waitpid`, we will be waiting forever. See #5596 for such a case. - wait_by_process = true; - continue; + if (proc->gens_[*mtopic] < gens[*mtopic]) { + // Potentially reapable. Update its gen count and try reaping it. + proc->gens_[*mtopic] = gens[*mtopic]; + if (proc->internal_proc_) { + // Try reaping an internal process. + if (proc->internal_proc_->exited()) { + proc->status = proc->internal_proc_->get_status(); + proc->completed = true; + } + } else if (proc->pid > 0) { + // Try reaping an external process. + int status = -1; + auto pid = waitpid(proc->pid, &status, WNOHANG | WUNTRACED); + if (pid > 0) { + assert(pid == proc->pid && "Unexpcted waitpid() return"); + debug(4, "Reaped PID %d", pid); + handle_child_status(pid, status); + } + } else { + assert(0 && "Don't know how to reap this process"); } - break; } - } else { - // pid < 0 indicates an error. One likely failure is ECHILD (no children), which is - // not an error and is ignored in the branch above. The other likely failure is - // EINTR, which means we got a signal, which is considered an error. We absolutely - // do not break or return on error, as we need to iterate over all constructed jobs - // but we only call waitpid for one pgrp at a time. We do bypass future waits in - // case of error, however. - has_error = true; - - // Do not audibly complain on interrupt (see #5293) - if (errno != EINTR) { - wperror(L"waitpid in process_mark_finished_children"); - } - break; } } } @@ -559,28 +450,6 @@ static bool process_mark_finished_children(bool block_on_fg) { s_disowned_pids.erase(std::remove_if(s_disowned_pids.begin(), s_disowned_pids.end(), [&status](pid_t pid) { return waitpid(pid, &status, WNOHANG) > 0; }), s_disowned_pids.end()); - - // Yes, the below can be collapsed to a single line, but it's worth being explicit about it with - // the comments. Fret not, the compiler will optimize it. (It better!) - if (jobs_skipped) { - // We received SIGCHLD but were not able to definitely say whether or not all children were - // reaped. - dirty_state = true; - } else { - // We can safely assume that no SIGCHLD means we can just return next time around - dirty_state = false; - } - - return !has_error; -} - -/// This is called from a signal handler. The signal is always SIGCHLD. -void job_handle_signal(int signal, siginfo_t *info, void *context) { - UNUSED(signal); - UNUSED(info); - UNUSED(context); - // This is the only place that this generation count is modified. It's OK if it overflows. - s_sigchld_generation_cnt += 1; } /// Given a command like "cat file", truncate it to a reasonable length. @@ -1053,15 +922,20 @@ void job_t::continue_job(bool send_sigcont) { } } - if (is_foreground()) { - if (is_completed()) { - // Set $status only if we are in the foreground and the last process in the job has - // finished and is not a short-circuited builtin. - auto &p = processes.back(); - if ((WIFEXITED(p->status) || WIFSIGNALED(p->status)) && p->pid) { - int status = proc_format_status(p->status); - proc_set_last_status(get_flag(job_flag_t::NEGATE) ? !status : status); - } + if (is_foreground() && is_completed()) { + // Set $status only if we are in the foreground and the last process in the job has + // finished and is not a short-circuited builtin. + bool negate = get_flag(job_flag_t::NEGATE); + auto &p = processes.back(); + if (p->internal_proc_) { + // Here the status is synthetic - not associated with a real exited process. + // TODO: clean this up, we shouldn't store the process's exit status in an unparsed + // state. + int status = p->status; + proc_set_last_status(negate ? !status : status); + } else if ((WIFEXITED(p->status) || WIFSIGNALED(p->status)) && p->pid) { + int status = proc_format_status(p->status); + proc_set_last_status(negate ? !status : status); } } } diff --git a/src/proc.h b/src/proc.h index 6950a7031..c588cf766 100644 --- a/src/proc.h +++ b/src/proc.h @@ -20,6 +20,7 @@ #include "io.h" #include "parse_tree.h" #include "tnode.h" +#include "topic_monitor.h" /// Types of processes. enum process_type_t { @@ -41,6 +42,28 @@ enum { JOB_CONTROL_NONE, }; +/// A structure representing a "process" internal to fish. This is backed by a pthread instead of a +/// separate process. +class internal_proc_t { + /// Whether the process has exited. + std::atomic exited_{}; + + /// If the process has exited, its status code. + std::atomic status_{}; + + public: + /// \return if this process has exited. + bool exited() const { return exited_.load(std::memory_order_relaxed); } + + /// Mark this process as exited, with the given status. + void mark_exited(int status); + + int get_status() const { + assert(exited() && "Process is not exited"); + return status_.load(std::memory_order_acquire); + } +}; + /// A structure representing a single fish process. Contains variables for tracking process state /// and the process argument list. Actually, a fish process can be either a regular external /// process, an internal builtin which may or may not spawn a fake IO process during execution, a @@ -112,10 +135,23 @@ class process_t { void set_io_chain(const io_chain_t &chain) { this->process_io_chain = chain; } + /// Store the current topic generations. That is, right before the process is launched, record + /// the generations of all topics; then we can tell which generation values have changed after + /// launch. This helps us avoid spurious waitpid calls. + void check_generations_before_launch(); + /// Actual command to pass to exec in case of EXTERNAL or INTERNAL_EXEC. wcstring actual_cmd; + + /// Generation counts for reaping. + generation_list_t gens_{}; + /// Process ID pid_t pid{0}; + + /// If we are an "internal process," that process. + std::shared_ptr internal_proc_{}; + /// File descriptor that pipe output should bind to. int pipe_write_fd{0}; /// True if process has completed. @@ -153,6 +189,13 @@ enum class job_flag_t { JOB_CONTROL, /// Whether the job wants to own the terminal when in the foreground. TERMINAL, + + JOB_FLAG_COUNT +}; + +template <> +struct enum_info_t { + static constexpr auto count = job_flag_t::JOB_FLAG_COUNT; }; typedef int job_id_t; @@ -193,6 +236,31 @@ class job_t { /// Sets the command. void set_command(const wcstring &cmd) { command_str = cmd; } + /// \return whether it is OK to reap a given process. Sometimes we want to defer reaping a + /// process if it is the group leader and the job is not yet constructed, because then we might + /// also reap the process group and then we cannot add new processes to the group. + bool can_reap(const process_t *p) const { + // Internal processes can always be reaped. + if (p->internal_proc_) { + return true; + } else if (p->pid <= 0) { + // Can't reap without a pid. + return false; + } else if (!is_constructed() && pgid > 0 && p->pid == pgid) { + // p is the the group leader in an under-construction job. + return false; + } else { + return true; + } + } + + /// \returns the reap topic for a process, which describes the manner in which we are reaped. A + /// none returns means don't reap, or perhaps defer reaping. + maybe_t reap_topic_for_process(const process_t *p) const { + if (p->completed || !can_reap(p)) return none(); + return p->internal_proc_ ? topic_t::internal_exit : topic_t::sigchld; + } + /// Returns a truncated version of the job string. Used when a message has already been emitted /// containing the full job string and job id, but using the job id alone would be confusing /// due to reuse of freed job ids. Prevents overloading the debug comments with the full, @@ -352,9 +420,6 @@ int proc_get_last_status(); /// \param interactive whether interactive jobs should be reaped as well bool job_reap(bool interactive); -/// Signal handler for SIGCHLD. Mark any processes with relevant information. -void job_handle_signal(int signal, siginfo_t *info, void *con); - /// Mark a process as failed to execute (and therefore completed). void job_mark_process_as_failed(const std::shared_ptr &job, const process_t *p); diff --git a/src/redirection.cpp b/src/redirection.cpp index 64acdda23..c0c1b6985 100644 --- a/src/redirection.cpp +++ b/src/redirection.cpp @@ -88,3 +88,23 @@ maybe_t dup2_list_t::resolve_chain(const io_chain_t &io_chain) { } return {std::move(result)}; } + +int dup2_list_t::fd_for_target_fd(int target) const { + // Paranoia. + if (target < 0) { + return target; + } + // Note we can simply walk our action list backwards, looking for src -> target dups. + int cursor = target; + for (auto iter = actions_.rbegin(); iter != actions_.rend(); ++iter) { + if (iter->target == cursor) { + // cursor is replaced by iter->src + cursor = iter->src; + } else if (iter->src == cursor && iter->target < 0) { + // cursor is closed. + cursor = -1; + break; + } + } + return cursor; +} diff --git a/src/redirection.h b/src/redirection.h index 64fd60043..16e8c155b 100644 --- a/src/redirection.h +++ b/src/redirection.h @@ -60,6 +60,11 @@ class dup2_list_t { /// The result contains the list of fd actions (dup2 and close), as well as the list /// of fds opened. static maybe_t resolve_chain(const io_chain_t &); + + /// \return the fd ultimately dup'd to a target fd, or -1 if the target is closed. + /// For example, if target fd is 1, and we have a dup2 chain 5->3 and 3->1, then we will return 5. + /// If the target is not referenced in the chain, returns target. + int fd_for_target_fd(int target) const; }; #endif diff --git a/src/signal.cpp b/src/signal.cpp index f1ab10581..8c2886650 100644 --- a/src/signal.cpp +++ b/src/signal.cpp @@ -15,6 +15,7 @@ #include "parser.h" #include "proc.h" #include "reader.h" +#include "topic_monitor.h" #include "wutil.h" // IWYU pragma: keep /// Struct describing an entry for the lookup table used to convert between signal names and signal @@ -230,6 +231,7 @@ static void handle_hup(int sig, siginfo_t *info, void *context) { } else { reader_exit(1, 1); } + topic_monitor_t::principal().post(topic_t::sighupint); } /// Handle sigterm. The only thing we do is restore the front process ID, then die. @@ -248,6 +250,7 @@ static void handle_int(int sig, siginfo_t *info, void *context) { if (reraise_if_forked_child(sig)) return; reader_handle_sigint(); default_handler(sig, info, context); + topic_monitor_t::principal().post(topic_t::sighupint); } /// Non-interactive ^C handler. @@ -260,8 +263,8 @@ static void handle_int_notinteractive(int sig, siginfo_t *info, void *context) { /// sigchld handler. Does notification and calls the handler in proc.c. static void handle_chld(int sig, siginfo_t *info, void *context) { if (reraise_if_forked_child(sig)) return; - job_handle_signal(sig, info, context); default_handler(sig, info, context); + topic_monitor_t::principal().post(topic_t::sigchld); } // We have a sigalarm handler that does nothing. This is used in the signal torture test, to verify diff --git a/src/topic_monitor.cpp b/src/topic_monitor.cpp new file mode 100644 index 000000000..2c28a7517 --- /dev/null +++ b/src/topic_monitor.cpp @@ -0,0 +1,114 @@ +#include "config.h" // IWYU pragma: keep + +#include "limits.h" +#include "topic_monitor.h" +#include "wutil.h" + +#include + +/// Implementation of the principal monitor. This uses new (and leaks) to avoid registering a +/// pointless at-exit handler for the dtor. +static topic_monitor_t *const s_principal = new topic_monitor_t(); + +topic_monitor_t &topic_monitor_t::principal() { + // Do not attempt to move s_principal to a function-level static, it needs to be accessed from a + // signal handler so it must not be lazily created. + return *s_principal; +} + +topic_monitor_t::topic_monitor_t() { + // Set up our pipes. Assert it succeeds. + auto pipes = make_autoclose_pipes({}); + assert(pipes.has_value() && "Failed to make pubsub pipes"); + pipes_ = pipes.acquire(); + + // Make sure that our write side doesn't block, else we risk hanging in a signal handler. + // The read end must block to avoid spinning in await. + DIE_ON_FAILURE(make_fd_nonblocking(pipes_.write.fd())); +} + +topic_monitor_t::~topic_monitor_t() = default; + +void topic_monitor_t::post(topic_t topic) { + // Beware, we may be in a signal handler! + // Atomically update the pending topics. + auto rawtopics = topic_set_t{topic}.to_raw(); + auto oldtopics = pending_updates_.fetch_or(rawtopics, std::memory_order_relaxed); + if ((oldtopics & rawtopics) == rawtopics) { + // No new bits were set. + return; + } + + // Ok, we changed one or more bits. Ensure the topic change is visible, and announce the change + // by writing a byte to the pipe. + std::atomic_thread_fence(std::memory_order_release); + ssize_t ret; + do { + // write() is async signal safe. + const uint8_t v = 0; + ret = write(pipes_.write.fd(), &v, sizeof v); + } while (ret < 0 && errno == EINTR); + // Ignore EAGAIN and other errors (which conceivably could occur during shutdown). +} + +void topic_monitor_t::await_metagen(generation_t mgen) { + // Fast check of the metagen before taking the lock. If it's changed we're done. + if (mgen != current_metagen()) return; + + // Take the lock (which may take a long time) and then check again. + std::unique_lock locker{wait_queue_lock_}; + if (mgen != current_metagen()) return; + + // Our metagen hasn't changed. Push our metagen onto the queue, then wait until we're the + // lowest. If multiple waiters are the lowest, then anyone can be the observer. + // Note the reason for picking the lowest metagen is to avoid a priority inversion where a lower + // metagen (therefore someone who should see changes) is blocked waiting for a higher metagen + // (who has already seen the changes). + wait_queue_.push(mgen); + while (wait_queue_.top() != mgen) { + wait_queue_notifier_.wait(locker); + } + wait_queue_.pop(); + + // We now have the lowest metagen in the wait queue. Notice we still hold the lock. + // Read until the metagen changes. It may already have changed. + // Note because changes are coalesced, we can read a lot, potentially draining the pipe. + while (mgen == current_metagen()) { + uint8_t ignored[PIPE_BUF]; + (void)read(pipes_.read.fd(), ignored, sizeof ignored); + } + + // Release the lock and wake up the remaining waiters. + locker.unlock(); + wait_queue_notifier_.notify_all(); +} + +topic_set_t topic_monitor_t::check(generation_list_t *gens, topic_set_t topics, bool wait) { + if (topics.none()) return topics; + + topic_set_t changed{}; + for (;;) { + // Load the topic list and see if anything has changed. + generation_list_t current = updated_gens(); + for (topic_t topic : topic_iter_t{}) { + if (topics.get(topic)) { + assert(gens->at(topic) <= current.at(topic) && + "Incoming gen count exceeded published count"); + if (gens->at(topic) < current.at(topic)) { + gens->at(topic) = current.at(topic); + changed.set(topic); + } + } + } + + // If we're not waiting, or something changed, then we're done. + if (!wait || changed.any()) { + break; + } + + // Try again. Note that we use the metagen corresponding to the topic list we just + // inspected, not the current one (which may have updates since we checked). + await_metagen(metagen_for(current)); + } + return changed; +} diff --git a/src/topic_monitor.h b/src/topic_monitor.h new file mode 100644 index 000000000..78498c4cc --- /dev/null +++ b/src/topic_monitor.h @@ -0,0 +1,167 @@ +#ifndef FISH_TOPIC_MONITOR_H +#define FISH_TOPIC_MONITOR_H + +#include "common.h" +#include "enum_set.h" +#include "io.h" + +#include +#include +#include +#include +#include +#include +#include + +/** Topic monitoring support. Topics are conceptually "a thing that can happen." For example, + delivery of a SIGINT, a child process exits, etc. It is possible to post to a topic, which means + that that thing happened. + + Associated with each topic is a current generation, which is a 64 bit value. When you query a + topic, you get back a generation. If on the next query the generation has increased, then it + indicates someone posted to the topic. + + For example, if you are monitoring a child process, you can query the sigchld topic. If it has + increased since your last query, it is possible that your child process has exited. + + Topic postings may be coalesced. That is there may be two posts to a given topic, yet the + generation only increases by 1. The only guarantee is that after a topic post, the current + generation value is larger than any value previously queried. + + Tying this all together is the topic_monitor_t. This provides the current topic generations, and + also provides the ability to perform a blocking wait for any topic to change in a particular topic + set. This is the real power of topics: you can wait for a sigchld signal OR a thread exit. + */ + +/// The list of topics that may be observed. +enum class topic_t : uint8_t { + sigchld, // Corresponds to SIGCHLD signal. + sighupint, // Corresponds to both SIGHUP and SIGINT signals. + internal_exit, // Corresponds to an internal process exit. + COUNT +}; + +/// Allow enum_iter to be used. +template <> +struct enum_info_t { + static constexpr auto count = topic_t::COUNT; +}; + +/// Set of topics. +using topic_set_t = enum_set_t; + +/// Counting iterator for topics. +using topic_iter_t = enum_iter_t; + +/// A generation is a counter incremented every time the value of a topic changes. +/// It is 64 bit so it will never wrap. +using generation_t = uint64_t; + +/// A generation value which is guaranteed to never be set and be larger than any valid generation. +constexpr generation_t invalid_generation = std::numeric_limits::max(); + +/// List of generation values, indexed by topics. +/// The initial value of a generation is always 0. +using generation_list_t = enum_array_t; + +/// Teh topic monitor class. This permits querying the current generation values for topics, +/// optionally blocking until they increase. +class topic_monitor_t { + private: + using topic_set_raw_t = uint8_t; + + static_assert(sizeof(topic_set_raw_t) * CHAR_BIT >= enum_count(), + "topic_set_raw is too small"); + + /// The current topic generation list, protected by a mutex. Note this may be opportunistically + /// updated at the point it is queried. + owning_lock current_gen_{{}}; + + /// The set of topics which have pending increments. + /// This is managed via atomics. + std::atomic pending_updates_{}; + + /// When a topic set is queried in a blocking way, the waiters are put into a queue. The waiter + /// with the smallest metagen is responsible for announcing the change to the rest of the + /// waiters. (The metagen is just the sum of the current generations.) Note that this is a + /// max-heap that defaults to std::less; by using std::greater it becomes a min heap. This is + /// protected by wait_queue_lock_. + std::priority_queue, std::greater> + wait_queue_; + + /// Mutex guarding the wait queue. + std::mutex wait_queue_lock_{}; + + /// Condition variable for broadcasting notifications. + std::condition_variable wait_queue_notifier_{}; + + /// Pipes used to communicate changes from the signal handler. + autoclose_pipes_t pipes_; + + /// \return the metagen for a topic generation list. + /// The metagen is simply the sum of topic generations. Note it is monotone. + static inline generation_t metagen_for(const generation_list_t &lst) { + return std::accumulate(lst.begin(), lst.end(), generation_t{0}); + } + + /// Wait for the current metagen to become different from \p gen. + /// If it is already different, return immediately. + void await_metagen(generation_t gen); + + /// Return the current generation list, opportunistically applying any pending updates. + generation_list_t updated_gens() { + auto current_gens = current_gen_.acquire(); + + // Atomically acquire the pending updates, swapping in 0. + // If there are no pending updates (likely), just return. + // Otherwise CAS in 0 and update our topics. + const auto relaxed = std::memory_order_relaxed; + topic_set_raw_t raw; + bool cas_success; + do { + raw = pending_updates_.load(relaxed); + if (raw == 0) return *current_gens; + cas_success = pending_updates_.compare_exchange_weak(raw, 0, relaxed, relaxed); + } while (!cas_success); + + // Update the current generation with our topics and return it. + auto topics = topic_set_t::from_raw(raw); + for (topic_t topic : topic_iter_t{}) { + current_gens->at(topic) += topics.get(topic) ? 1 : 0; + } + return *current_gens; + } + + /// \return the metagen for the current topic generation list. + inline generation_t current_metagen() { return metagen_for(updated_gens()); } + + public: + topic_monitor_t(); + ~topic_monitor_t(); + + /// topic_monitors should not be copied, and there should be no reason to move one. + void operator=(const topic_monitor_t &) = delete; + topic_monitor_t(const topic_monitor_t &) = delete; + void operator=(topic_monitor_t &&) = delete; + topic_monitor_t(topic_monitor_t &&) = delete; + + /// The principal topic_monitor. This may be fetched from a signal handler. + static topic_monitor_t &principal(); + + /// Post to a topic, potentially from a signal handler. + void post(topic_t topic); + + /// Access the current generations. + generation_list_t current_generations() { return updated_gens(); } + + /// Access the generation for a topic. + generation_t generation_for_topic(topic_t topic) { return current_generations().at(topic); } + + /// See if for any topic (specified in \p topics) has changed from the values in the generation + /// list \p gens. If \p wait is set, then wait if there are no changes; otherwise return + /// immediately. + /// \return the set of topics that changed, updating the generation list \p gens. + topic_set_t check(generation_list_t *gens, topic_set_t topics, bool wait); +}; + +#endif