Implement new event mechanism and migrate builtin and block output

This commit merges support for a new event publishing mechanism
"topic_monitor" that allow for waiting on multiple event types. It then
replaces waitpid() logic inside `process_mark_finished_children` with new
and simpler logic built around this mechanims. Lastly it migrates the
builtin and function output from processes to background threads.
This commit is contained in:
ridiculousfish
2019-02-17 21:42:18 -08:00
18 changed files with 878 additions and 387 deletions

View File

@@ -80,7 +80,7 @@ SET(FISH_SRCS
src/postfork.cpp src/proc.cpp src/reader.cpp src/sanity.cpp src/screen.cpp
src/signal.cpp src/tinyexpr.cpp src/tnode.cpp src/tokenizer.cpp src/utf8.cpp src/util.cpp
src/wcstringutil.cpp src/wgetopt.cpp src/wildcard.cpp src/wutil.cpp
src/future_feature_flags.cpp src/redirection.cpp
src/future_feature_flags.cpp src/redirection.cpp src/topic_monitor.cpp
)
# Header files are just globbed.

View File

@@ -119,7 +119,7 @@ FISH_OBJS := obj/autoload.o obj/builtin.o obj/builtin_bg.o obj/builtin_bind.o ob
obj/parser_keywords.o obj/path.o obj/postfork.o obj/proc.o obj/reader.o \
obj/sanity.o obj/screen.o obj/signal.o obj/tinyexpr.o obj/tokenizer.o obj/tnode.o obj/utf8.o \
obj/util.o obj/wcstringutil.o obj/wgetopt.o obj/wildcard.o obj/wutil.o \
obj/future_feature_flags.o obj/redirection.o
obj/future_feature_flags.o obj/redirection.o obj/topic_monitor.o
FISH_INDENT_OBJS := obj/fish_indent.o obj/print_help.o $(FISH_OBJS)

View File

@@ -307,6 +307,12 @@ void vec_append(std::vector<T> &receiver, std::vector<T> &&donator) {
std::make_move_iterator(donator.end()));
}
/// Move an object into a shared_ptr.
template <typename T>
std::shared_ptr<T> move_to_sharedptr(T &&v) {
return std::make_shared<T>(std::move(v));
}
/// Print a stack trace to stderr.
void show_stackframe(const wchar_t msg_level, int frame_count = 100, int skip_levels = 0);

View File

@@ -1,26 +1,111 @@
#pragma once
#include <bitset>
#include <cassert>
#include <iterator>
/// A type (to specialize) that provides a count for an enum.
/// Example:
/// template<> struct enum_info_t<MyEnum>
/// { static constexpr auto count = MyEnum::COUNT; };
template <typename T>
class enum_set_t {
struct enum_info_t {};
/// \return the count of an enum.
template <typename T>
constexpr size_t enum_count() {
return static_cast<size_t>(enum_info_t<T>::count);
}
/// A bit set indexed by an enum type.
template <typename T>
class enum_set_t : private std::bitset<enum_count<T>()> {
private:
using super = std::bitset<enum_count<T>()>;
static size_t index_of(T t) { return static_cast<size_t>(t); }
explicit enum_set_t(unsigned long raw) : super(raw) {}
public:
enum_set_t() = default;
/*implicit*/ enum_set_t(T v) { set(v); }
/*implicit*/ enum_set_t(std::initializer_list<T> vs) {
for (T v : vs) set(v);
}
static enum_set_t from_raw(unsigned long v) { return enum_set_t{v}; }
unsigned long to_raw() const { return super::to_ulong(); }
bool get(T t) const { return super::test(index_of(t)); }
void set(T t, bool v = true) { super::set(index_of(t), v); }
void clear(T t) { super::reset(index_of(t)); }
bool none() const { return super::none(); }
bool any() const { return super::any(); }
bool operator==(const enum_set_t &rhs) const { return super::operator==(rhs); }
bool operator!=(const enum_set_t &rhs) const { return super::operator!=(rhs); }
};
/// An array of Elem indexed by an enum class.
template <typename Elem, typename T>
class enum_array_t : public std::array<Elem, enum_count<T>()> {
using super = std::array<Elem, enum_count<T>()>;
using base_type_t = typename std::underlying_type<T>::type;
std::bitset<8 * sizeof(base_type_t)> bitmask{0};
static int index_of(T t) { return static_cast<base_type_t>(t); }
public:
bool get(T t) const { return bitmask.test(index_of(t)); }
void set(T t, bool v) {
if (v) {
bitmask.set(index_of(t));
} else {
bitmask.reset(index_of(t));
}
}
void set(T t) { bitmask.set(index_of(t)); }
void clear(T t) { bitmask.reset(index_of(t)); }
Elem &at(T t) { return super::at(index_of(t)); }
const Elem &at(T t) const { return super::at(index_of(t)); }
Elem &operator[](T t) { return super::operator[](index_of(t)); }
const Elem &operator[](T t) const { return super::operator[](index_of(t)); }
};
/// A counting iterator for an enum class.
/// This enumerates the values of an enum class from 0 up to (not including) count.
/// Example:
/// for (auto v : enum_iter_t<MyEnum>) {...}
template <typename T>
class enum_iter_t {
using base_type_t = typename std::underlying_type<T>::type;
struct iterator_t {
friend class enum_iter_t;
using iterator_category = std::forward_iterator_tag;
using value_type = T;
using difference_type = long;
explicit iterator_t(base_type_t v) : v_(v) {}
T operator*() const { return static_cast<T>(v_); }
const T *operator->() const { return static_cast<const T *>(v_); }
iterator_t &operator++() {
v_ += 1;
return *this;
}
iterator_t operator++(int) {
auto res = *this;
v_ += 1;
return res;
}
bool operator==(iterator_t rhs) const { return v_ == rhs.v_; }
bool operator!=(iterator_t rhs) const { return v_ != rhs.v_; }
private:
base_type_t v_{};
};
public:
iterator_t begin() const { return iterator_t{0}; }
iterator_t end() const { return iterator_t{static_cast<base_type_t>(enum_count<T>())}; }
};

View File

@@ -34,6 +34,7 @@
#include "fallback.h" // IWYU pragma: keep
#include "function.h"
#include "io.h"
#include "iothread.h"
#include "parse_tree.h"
#include "parser.h"
#include "postfork.h"
@@ -55,16 +56,6 @@
/// Base open mode to pass to calls to open.
#define OPEN_MASK 0666
/// Called in a forked child.
static void exec_write_and_exit(int fd, const char *buff, size_t count, int status) {
if (write_loop(fd, buff, count) == -1) {
debug(0, WRITE_ERROR);
wperror(L"write");
exit_without_destructors(status);
}
exit_without_destructors(status);
}
void exec_close(int fd) {
ASSERT_IS_MAIN_THREAD();
@@ -82,12 +73,11 @@ void exec_close(int fd) {
}
/// Returns true if the redirection is a file redirection to a file other than /dev/null.
static bool redirection_is_to_real_file(const io_data_t *io) {
static bool redirection_is_to_real_file(const shared_ptr<io_data_t> &io) {
bool result = false;
if (io != NULL && io->io_mode == io_mode_t::file) {
if (io && io->io_mode == io_mode_t::file) {
// It's a file redirection. Compare the path to /dev/null.
const io_file_t *io_file = static_cast<const io_file_t *>(io);
const char *path = io_file->filename_cstr;
const char *path = static_cast<const io_file_t *>(io.get())->filename_cstr;
if (strcmp(path, "/dev/null") != 0) {
// It's not /dev/null.
result = true;
@@ -361,6 +351,98 @@ static void on_process_created(const std::shared_ptr<job_t> &j, pid_t child_pid)
}
}
/// Construct an internal process for the process p. In the background, write the data \p outdata to
/// stdout and \p errdata to stderr, respecting the io chain \p ios. For example if target_fd is 1
/// (stdout), and there is a dup2 3->1, then we need to write to fd 3. Then exit the internal
/// process.
static bool run_internal_process(process_t *p, std::string outdata, std::string errdata,
io_chain_t ios) {
p->check_generations_before_launch();
// We want both the dup2s and the io_chain_ts to be kept alive by the background thread, because
// they may own an fd that we want to write to. Move them all to a shared_ptr. The strings as
// well (they may be long).
// Construct a little helper struct to make it simpler to move into our closure without copying.
struct write_fields_t {
int src_outfd{-1};
std::string outdata{};
int src_errfd{-1};
std::string errdata{};
io_chain_t ios{};
maybe_t<dup2_list_t> dup2s{};
std::shared_ptr<internal_proc_t> internal_proc{};
int success_status{};
bool skip_out() const { return outdata.empty() || src_outfd < 0; }
bool skip_err() const { return errdata.empty() || src_errfd < 0; }
};
auto f = std::make_shared<write_fields_t>();
f->outdata = std::move(outdata);
f->errdata = std::move(errdata);
// Construct and assign the internal process to the real process.
p->internal_proc_ = std::make_shared<internal_proc_t>();
f->internal_proc = p->internal_proc_;
// Resolve the IO chain.
// Note it's important we do this even if we have no out or err data, because we may have been
// asked to truncate a file (e.g. `echo -n '' > /tmp/truncateme.txt'). The open() in the dup2
// list resolution will ensure this happens.
f->dup2s = dup2_list_t::resolve_chain(ios);
if (!f->dup2s) {
return false;
}
// Figure out which source fds to write to. If they are closed (unlikely) we just exit
// successfully.
f->src_outfd = f->dup2s->fd_for_target_fd(STDOUT_FILENO);
f->src_errfd = f->dup2s->fd_for_target_fd(STDERR_FILENO);
// If we have nothing to right we can elide the thread.
// TODO: support eliding output to /dev/null.
if (f->skip_out() && f->skip_err()) {
f->internal_proc->mark_exited(EXIT_SUCCESS);
return true;
}
// Ensure that ios stays alive, it may own fds.
f->ios = ios;
// If our process is a builtin, it will have already set its status value. Make sure we
// propagate that if our I/O succeeds and don't read it on a background thread. TODO: have
// builtin_run provide this directly, rather than setting it in the process.
f->success_status = p->status;
iothread_perform([f]() {
int status = f->success_status;
if (!f->skip_out()) {
ssize_t ret = write_loop(f->src_outfd, f->outdata.data(), f->outdata.size());
if (ret < 0) {
if (errno != EPIPE) {
wperror(L"write");
}
if (!status) status = 1;
}
}
if (!f->skip_err()) {
ssize_t ret = write_loop(f->src_errfd, f->errdata.data(), f->errdata.size());
if (ret < 0) {
if (errno != EPIPE) {
wperror(L"write");
}
if (!status) status = 1;
}
}
f->internal_proc->mark_exited(status);
});
return true;
}
/// Call fork() as part of executing a process \p p in a job \j. Execute \p child_action in the
/// context of the child. Returns true if fork succeeded, false if fork failed.
static bool fork_child_for_process(const std::shared_ptr<job_t> &job, process_t *p,
@@ -505,71 +587,71 @@ static bool exec_internal_builtin_proc(parser_t &parser, const std::shared_ptr<j
static bool handle_builtin_output(const std::shared_ptr<job_t> &j, process_t *p,
io_chain_t *io_chain, const io_streams_t &builtin_io_streams) {
assert(p->type == INTERNAL_BUILTIN && "Process is not a builtin");
// Handle output from builtin commands. In the general case, this means forking of a
// worker process, that will write out the contents of the stdout and stderr buffers
// to the correct file descriptor. Since forking is expensive, fish tries to avoid
// it when possible.
bool fork_was_skipped = false;
const shared_ptr<io_data_t> stdout_io = io_chain->get_io_for_fd(STDOUT_FILENO);
const shared_ptr<io_data_t> stderr_io = io_chain->get_io_for_fd(STDERR_FILENO);
const output_stream_t &stdout_stream = builtin_io_streams.out;
const output_stream_t &stderr_stream = builtin_io_streams.err;
// If we are outputting to a file, we have to actually do it, even if we have no
// output, so that we can truncate the file. Does not apply to /dev/null.
bool must_fork = redirection_is_to_real_file(stdout_io.get()) ||
redirection_is_to_real_file(stderr_io.get());
if (!must_fork && p->is_last_in_job) {
// We are handling reads directly in the main loop. Note that we may still end
// up forking.
const bool stdout_is_bufferfill =
(stdout_io && stdout_io->io_mode == io_mode_t::bufferfill);
const std::shared_ptr<io_buffer_t> stdout_buffer =
stdout_is_bufferfill ? static_cast<io_bufferfill_t *>(stdout_io.get())->buffer()
: nullptr;
const bool no_stdout_output = stdout_stream.empty();
const bool no_stderr_output = stderr_stream.empty();
const bool stdout_discarded = stdout_stream.buffer().discarded();
// Mark if we discarded output.
if (stdout_stream.buffer().discarded()) p->status = STATUS_READ_TOO_MUCH;
if (!stdout_discarded && no_stdout_output && no_stderr_output) {
// The builtin produced no output and is not inside of a pipeline. No
// need to fork or even output anything.
debug(4, L"Skipping fork: no output for internal builtin '%ls'", p->argv0());
fork_was_skipped = true;
} else if (no_stderr_output && stdout_buffer) {
// The builtin produced no stderr, and its stdout is going to an
// internal buffer. There is no need to fork. This helps out the
// performance quite a bit in complex completion code.
// TODO: we're sloppy about handling explicitly separated output.
// Theoretically we could have explicitly separated output on stdout and
// also stderr output; in that case we ought to thread the exp-sep output
// through to the io buffer. We're getting away with this because the only
// thing that can output exp-sep output is `string split0` which doesn't
// also produce stderr.
debug(4, L"Skipping fork: buffered output for internal builtin '%ls'", p->argv0());
// We will try to elide constructing an internal process. However if the output is going to a
// real file, we have to do it. For example in `echo -n > file.txt` we proceed to open file.txt
// even though there is no output, so that it is properly truncated.
const shared_ptr<io_data_t> stdout_io = io_chain->get_io_for_fd(STDOUT_FILENO);
const shared_ptr<io_data_t> stderr_io = io_chain->get_io_for_fd(STDERR_FILENO);
bool must_use_process =
redirection_is_to_real_file(stdout_io) || redirection_is_to_real_file(stderr_io);
stdout_buffer->append_from_stream(stdout_stream);
fork_was_skipped = true;
} else if (stdout_io.get() == NULL && stderr_io.get() == NULL) {
// We are writing to normal stdout and stderr. Just do it - no need to fork.
debug(4, L"Skipping fork: ordinary output for internal builtin '%ls'", p->argv0());
const std::string outbuff = wcs2string(stdout_stream.contents());
const std::string errbuff = wcs2string(stderr_stream.contents());
bool builtin_io_done =
do_builtin_io(outbuff.data(), outbuff.size(), errbuff.data(), errbuff.size());
if (!builtin_io_done && errno != EPIPE) {
redirect_tty_output(); // workaround glibc bug
debug(0, "!builtin_io_done and errno != EPIPE");
show_stackframe(L'E');
}
if (stdout_discarded) p->status = STATUS_READ_TOO_MUCH;
fork_was_skipped = true;
}
// If we are directing output to a buffer, then we can just transfer it directly without needing
// to write to the bufferfill pipe. Note this is how we handle explicitly separated stdout
// output (i.e. string split0) which can't really be sent through a pipe.
// TODO: we're sloppy about handling explicitly separated output.
// Theoretically we could have explicitly separated output on stdout and also stderr output; in
// that case we ought to thread the exp-sep output through to the io buffer. We're getting away
// with this because the only thing that can output exp-sep output is `string split0` which
// doesn't also produce stderr. Also note that we never send stderr to a buffer, so there's no
// need for a similar check for stderr.
bool stdout_done = false;
if (stdout_io && stdout_io->io_mode == io_mode_t::bufferfill) {
auto stdout_buffer = static_cast<io_bufferfill_t *>(stdout_io.get())->buffer();
stdout_buffer->append_from_stream(stdout_stream);
stdout_done = true;
}
if (fork_was_skipped) {
// Figure out any data remaining to write. We may have none in which case we can short-circuit.
std::string outbuff = stdout_done ? std::string{} : wcs2string(stdout_stream.contents());
std::string errbuff = wcs2string(stderr_stream.contents());
// If we have no redirections for stdout/stderr, just write them directly.
if (!stdout_io && !stderr_io) {
bool did_err = false;
if (write_loop(STDOUT_FILENO, outbuff.data(), outbuff.size()) < 0) {
if (errno != EPIPE) {
did_err = true;
debug(0, "Error while writing to stdout");
wperror(L"write_loop");
}
}
if (write_loop(STDERR_FILENO, errbuff.data(), errbuff.size()) < 0) {
if (errno != EPIPE && !did_err) {
did_err = true;
debug(0, "Error while writing to stderr");
wperror(L"write_loop");
}
}
if (did_err) {
redirect_tty_output(); // workaround glibc bug
debug(0, "!builtin_io_done and errno != EPIPE");
show_stackframe(L'E');
}
// Clear the buffers to indicate we finished.
outbuff.clear();
errbuff.clear();
}
if (!must_use_process && outbuff.empty() && errbuff.empty()) {
// We do not need to construct a background process.
// TODO: factor this job-status-setting stuff into a single place.
p->completed = 1;
if (p->is_last_in_job) {
debug(4, L"Set status of job %d (%ls) to %d using short circuit", j->job_id,
@@ -578,37 +660,13 @@ static bool handle_builtin_output(const std::shared_ptr<job_t> &j, process_t *p,
int status = p->status;
proc_set_last_status(j->get_flag(job_flag_t::NEGATE) ? (!status) : status);
}
return true;
} else {
// Ok, unfortunately, we have to do a real fork. Bummer. We work hard to make
// sure we don't have to wait for all our threads to exit, by arranging things
// so that we don't have to allocate memory or do anything except system calls
// in the child.
//
// These strings may contain embedded nulls, so don't treat them as C strings.
const std::string outbuff_str = wcs2string(stdout_stream.contents());
const char *outbuff = outbuff_str.data();
size_t outbuff_len = outbuff_str.size();
const std::string errbuff_str = wcs2string(stderr_stream.contents());
const char *errbuff = errbuff_str.data();
size_t errbuff_len = errbuff_str.size();
// Resolve our IO chain to a sequence of dup2s.
auto dup2s = dup2_list_t::resolve_chain(*io_chain);
if (!dup2s) {
return false;
}
// Construct and run our background process.
fflush(stdout);
fflush(stderr);
if (!fork_child_for_process(j, p, *dup2s, false, "internal builtin", [&] {
do_builtin_io(outbuff, outbuff_len, errbuff, errbuff_len);
exit_without_destructors(p->status);
})) {
return false;
}
return run_internal_process(p, std::move(outbuff), std::move(errbuff), *io_chain);
}
return true;
}
/// Executes an external command.
@@ -784,24 +842,9 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr<job_t>
io_chain.remove(block_output_bufferfill);
auto block_output_buffer = io_bufferfill_t::finish(std::move(block_output_bufferfill));
// Resolve our IO chain to a sequence of dup2s.
auto dup2s = dup2_list_t::resolve_chain(io_chain);
if (!dup2s) {
return false;
}
const std::string buffer_contents = block_output_buffer->buffer().newline_serialized();
const char *buffer = buffer_contents.data();
size_t count = buffer_contents.size();
if (count > 0) {
// We don't have to drain threads here because our child process is simple.
const char *fork_reason =
p->type == INTERNAL_BLOCK_NODE ? "internal block io" : "internal function io";
if (!fork_child_for_process(j, p, *dup2s, false, fork_reason, [&] {
exec_write_and_exit(STDOUT_FILENO, buffer, count, status);
})) {
return false;
}
std::string buffer_contents = block_output_buffer->buffer().newline_serialized();
if (!buffer_contents.empty()) {
return run_internal_process(p, std::move(buffer_contents), {} /*errdata*/, io_chain);
} else {
if (p->is_last_in_job) {
proc_set_last_status(j->get_flag(job_flag_t::NEGATE) ? (!status) : status);
@@ -899,6 +942,7 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr<
}
// Execute the process.
p->check_generations_before_launch();
switch (p->type) {
case INTERNAL_FUNCTION:
case INTERNAL_BLOCK_NODE: {

View File

@@ -25,6 +25,7 @@
#include <unistd.h>
#include <wchar.h>
#include <wctype.h>
#include <thread>
#include <algorithm>
#include <array>
@@ -69,6 +70,7 @@
#include "signal.h"
#include "tnode.h"
#include "tokenizer.h"
#include "topic_monitor.h"
#include "utf8.h"
#include "util.h"
#include "wcstringutil.h"
@@ -286,6 +288,48 @@ static void test_str_to_num() {
L"converting invalid num to long did not fail");
}
enum class test_enum { alpha, beta, gamma, COUNT };
template <>
struct enum_info_t<test_enum> {
static constexpr auto count = test_enum::COUNT;
};
static void test_enum_set() {
say(L"Testing enum set");
enum_set_t<test_enum> es;
do_test(es.none());
do_test(!es.any());
do_test(es.to_raw() == 0);
do_test(es == enum_set_t<test_enum>::from_raw(0));
do_test(es != enum_set_t<test_enum>::from_raw(1));
es.set(test_enum::beta);
do_test(es.to_raw() == 2);
do_test(es == enum_set_t<test_enum>::from_raw(2));
do_test(es == enum_set_t<test_enum>{test_enum::beta});
do_test(es != enum_set_t<test_enum>::from_raw(3));
do_test(es.any());
do_test(!es.none());
unsigned idx = 0;
for (auto v : enum_iter_t<test_enum>{}) {
do_test(static_cast<unsigned>(v) == idx);
idx++;
}
do_test(static_cast<unsigned>(test_enum::COUNT) == idx);
}
static void test_enum_array() {
say(L"Testing enum array");
enum_array_t<std::string, test_enum> es{};
do_test(es.size() == enum_count<test_enum>());
es[test_enum::beta] = "abc";
do_test(es[test_enum::beta] == "abc");
es.at(test_enum::gamma) = "def";
do_test(es.at(test_enum::gamma) == "def");
}
/// Test sane escapes.
static void test_unescape_sane() {
const struct test_t {
@@ -2378,6 +2422,30 @@ static void test_dup2s() {
do_test(!list.has_value());
}
static void test_dup2s_fd_for_target_fd() {
using std::make_shared;
io_chain_t chain;
// note io_fd_t params are backwards from dup2.
chain.push_back(make_shared<io_close_t>(10));
chain.push_back(make_shared<io_fd_t>(9, 10, true));
chain.push_back(make_shared<io_fd_t>(5, 8, true));
chain.push_back(make_shared<io_fd_t>(1, 4, true));
chain.push_back(make_shared<io_fd_t>(3, 5, true));
auto list = dup2_list_t::resolve_chain(chain);
do_test(list.has_value());
do_test(list->fd_for_target_fd(3) == 8);
do_test(list->fd_for_target_fd(5) == 8);
do_test(list->fd_for_target_fd(8) == 8);
do_test(list->fd_for_target_fd(1) == 4);
do_test(list->fd_for_target_fd(4) == 4);
do_test(list->fd_for_target_fd(100) == 100);
do_test(list->fd_for_target_fd(0) == 0);
do_test(list->fd_for_target_fd(-1) == -1);
do_test(list->fd_for_target_fd(9) == -1);
do_test(list->fd_for_target_fd(10) == -1);
}
/// Testing colors.
static void test_colors() {
say(L"Testing colors");
@@ -5031,6 +5099,68 @@ void test_normalize_path() {
do_test(path_normalize_for_cd(L"/abc/def/", L"../ghi/..") == L"/abc/ghi/..");
}
static void test_topic_monitor() {
say(L"Testing topic monitor");
topic_monitor_t monitor;
generation_list_t gens{};
constexpr auto t = topic_t::sigchld;
do_test(gens[t] == 0);
do_test(monitor.generation_for_topic(t) == 0);
auto changed = monitor.check(&gens, topic_set_t{t}, false /* wait */);
do_test(changed.none());
do_test(gens[t] == 0);
monitor.post(t);
changed = monitor.check(&gens, topic_set_t{t}, true /* wait */);
do_test(changed == topic_set_t{t});
do_test(gens[t] == 1);
do_test(monitor.generation_for_topic(t) == 1);
monitor.post(t);
do_test(monitor.generation_for_topic(t) == 2);
changed = monitor.check(&gens, topic_set_t{t}, true /* wait */);
do_test(changed == topic_set_t{t});
}
static void test_topic_monitor_torture() {
say(L"Torture-testing topic monitor");
topic_monitor_t monitor;
const size_t thread_count = 64;
constexpr auto t1 = topic_t::sigchld;
constexpr auto t2 = topic_t::sighupint;
std::vector<generation_list_t> gens;
gens.resize(thread_count, generation_list_t{});
std::atomic<uint32_t> post_count{};
for (auto &gen : gens) {
gen = monitor.current_generations();
post_count += 1;
monitor.post(t1);
}
std::atomic<uint32_t> completed{};
std::vector<std::thread> threads;
for (size_t i = 0; i < thread_count; i++) {
threads.emplace_back(
[&](size_t i) {
for (size_t j = 0; j < (1 << 11); j++) {
auto before = gens[i];
auto changed = monitor.check(&gens[i], topic_set_t{t1, t2}, true /* wait */);
do_test(before[t1] < gens[i][t1]);
do_test(gens[i][t1] <= post_count);
do_test(gens[i][t2] == 0);
}
auto amt = completed.fetch_add(1, std::memory_order_relaxed);
},
i);
}
while (completed.load(std::memory_order_relaxed) < thread_count) {
post_count += 1;
monitor.post(t1);
}
for (auto &t : threads) t.join();
}
/// Main test.
int main(int argc, char **argv) {
UNUSED(argc);
@@ -5082,6 +5212,8 @@ int main(int argc, char **argv) {
if (should_test_function("wcstring_tok")) test_wcstring_tok();
if (should_test_function("env_vars")) test_env_vars();
if (should_test_function("str_to_num")) test_str_to_num();
if (should_test_function("enum")) test_enum_set();
if (should_test_function("enum")) test_enum_array();
if (should_test_function("highlighting")) test_highlighting();
if (should_test_function("new_parser_ll2")) test_new_parser_ll2();
if (should_test_function("new_parser_fuzzing"))
@@ -5115,6 +5247,7 @@ int main(int argc, char **argv) {
if (should_test_function("test")) test_test();
if (should_test_function("wcstod")) test_wcstod();
if (should_test_function("dup2s")) test_dup2s();
if (should_test_function("dup2s")) test_dup2s_fd_for_target_fd();
if (should_test_function("path")) test_path();
if (should_test_function("pager_navigation")) test_pager_navigation();
if (should_test_function("pager_layout")) test_pager_layout();
@@ -5148,6 +5281,8 @@ int main(int argc, char **argv) {
if (should_test_function("maybe")) test_maybe();
if (should_test_function("layout_cache")) test_layout_cache();
if (should_test_function("normalize")) test_normalize_path();
if (should_test_function("topics")) test_topic_monitor();
if (should_test_function("topics")) test_topic_monitor_torture();
// history_tests_t::test_history_speed();
say(L"Encountered %d errors in low-level tests", err_count);

View File

@@ -13,6 +13,7 @@
#include "fallback.h" // IWYU pragma: keep
#include "io.h"
#include "iothread.h"
#include "redirection.h"
#include "wutil.h" // IWYU pragma: keep
io_data_t::~io_data_t() = default;
@@ -30,6 +31,7 @@ void io_pipe_t::print() const {
void io_bufferfill_t::print() const { fwprintf(stderr, L"bufferfill {%d}\n", write_fd_.fd()); }
void io_buffer_t::append_from_stream(const output_stream_t &stream) {
if (stream.empty()) return;
scoped_lock locker(append_lock_);
if (buffer_.discarded()) return;
if (stream.buffer().discarded()) {
@@ -122,7 +124,7 @@ void io_buffer_t::begin_background_fillthread(autoclose_fd_t fd) {
// We want our background thread to own the fd but it's not easy to move into a std::function.
// Use a shared_ptr.
auto fdref = std::make_shared<autoclose_fd_t>(std::move(fd));
auto fdref = move_to_sharedptr(std::move(fd));
// Our function to read until the receiver is closed.
// It's OK to capture 'this' by value because 'this' owns the background thread and joins it

View File

@@ -410,28 +410,25 @@ struct io_streams_t {
output_stream_t err;
// fd representing stdin. This is not closed by the destructor.
int stdin_fd;
int stdin_fd{-1};
// Whether stdin is "directly redirected," meaning it is the recipient of a pipe (foo | cmd) or
// direct redirection (cmd < foo.txt). An "indirect redirection" would be e.g. begin ; cmd ; end
// < foo.txt
bool stdin_is_directly_redirected;
bool stdin_is_directly_redirected{false};
// Indicates whether stdout and stderr are redirected (e.g. to a file or piped).
bool out_is_redirected;
bool err_is_redirected;
bool out_is_redirected{false};
bool err_is_redirected{false};
// Actual IO redirections. This is only used by the source builtin. Unowned.
const io_chain_t *io_chain;
const io_chain_t *io_chain{nullptr};
io_streams_t(size_t read_limit)
: out(read_limit),
err(read_limit),
stdin_fd(-1),
stdin_is_directly_redirected(false),
out_is_redirected(false),
err_is_redirected(false),
io_chain(NULL) {}
// io_streams_t cannot be copied.
io_streams_t(const io_streams_t &) = delete;
void operator=(const io_streams_t &) = delete;
explicit io_streams_t(size_t read_limit) : out(read_limit), err(read_limit), stdin_fd(-1) {}
};
#if 0

View File

@@ -2,6 +2,7 @@
#ifndef FISH_IOTHREAD_H
#define FISH_IOTHREAD_H
#include <pthread.h>
#include <functional>
#include <type_traits>

View File

@@ -379,27 +379,3 @@ void safe_report_exec_error(int err, const char *actual_cmd, const char *const *
}
}
}
/// Perform output from builtins. May be called from a forked child, so don't do anything that may
/// allocate memory, etc.
bool do_builtin_io(const char *out, size_t outlen, const char *err, size_t errlen) {
int saved_errno = 0;
bool success = true;
if (out && outlen && write_loop(STDOUT_FILENO, out, outlen) < 0) {
saved_errno = errno;
if (errno != EPIPE) {
debug_safe(0, "Error while writing to stdout");
errno = saved_errno;
safe_perror("write_loop");
}
success = false;
}
if (err && errlen && write_loop(STDERR_FILENO, err, errlen) < 0) {
saved_errno = errno;
success = false;
}
errno = saved_errno;
return success;
}

View File

@@ -41,9 +41,6 @@ int setup_child_process(process_t *p, const dup2_list_t &dup2s);
/// wait for threads to die.
pid_t execute_fork(bool wait_for_threads_to_die);
/// Perform output from builtins. Returns true on success.
bool do_builtin_io(const char *out, size_t outlen, const char *err, size_t errlen);
/// Report an error from failing to exec or posix_spawn a command.
void safe_report_exec_error(int err, const char *actual_cmd, const char *const *argv,
const char *const *envv);

View File

@@ -247,6 +247,13 @@ bool job_t::signal(int signal) {
return true;
}
void internal_proc_t::mark_exited(int status) {
assert(!exited() && "Process is already exited");
exited_.store(true, std::memory_order_relaxed);
status_.store(status, std::memory_order_release);
topic_monitor_t::principal().post(topic_t::internal_exit);
}
static void mark_job_complete(const job_t *j) {
for (auto &p : j->processes) {
p->completed = 1;
@@ -324,7 +331,11 @@ static void handle_child_status(pid_t pid, int status) {
}
}
process_t::process_t() {}
process_t::process_t() = default;
void process_t::check_generations_before_launch() {
gens_ = topic_monitor_t::principal().current_generations();
}
job_t::job_t(job_id_t jobid, io_chain_t bio, std::shared_ptr<job_t> parent)
: block_io(std::move(bio)),
@@ -361,194 +372,74 @@ void add_disowned_pgid(pid_t pgid) {
}
}
/// A static value tracking how many SIGCHLDs we have seen, which is used in a heurstic to
/// determine if we should call waitpid() at all in `process_mark_finished_children`.
static volatile process_generation_count_t s_sigchld_generation_cnt = 0;
/// See if any children of a fully constructed job have exited or been killed, and mark them
/// accordingly. We cannot reap just any child that's exited, (as in, `waitpid(-1,…`) since
/// that may reap a pgrp leader that has exited but in a job with another process that has yet to
/// launch and join its pgrp (#5219).
/// \param block_on_fg when true, blocks waiting for the foreground job to finish.
/// \return whether the operation completed without incident
static bool process_mark_finished_children(bool block_on_fg) {
/// See if any reapable processes have exited, and mark them accordingly.
/// \param block_ok if no reapable processes have exited, block until one is (or until we receive a
/// signal).
static void process_mark_finished_children(bool block_ok) {
ASSERT_IS_MAIN_THREAD();
// We can't always use SIGCHLD to determine if waitpid() should be called since it is not
// strictly one-SIGCHLD-per-one-child-exited (i.e. multiple exits can share a SIGCHLD call) and
// we a) return immediately the first time a dead child is reaped, b) explicitly skip over jobs
// that aren't yet fully constructed, so it's possible that we can get SIGCHLD and even find a
// killed child in the jobs we are reaping, but also have an exited child process in a job that
// hasn't been fully constructed yet - which means we can end up never knowing about the exited
// child process in that job if we use SIGCHLD count as the only metric for whether or not
// waitpid() is called.
// Without this optimization, the slowdown caused by calling waitpid() even just once each time
// `process_mark_finished_children()` is called is rather obvious (see the performance-related
// discussion in #5219), making it worth the complexity of this heuristic.
// Get the exit and signal generations of all reapable processes.
// The exit generation tells us if we have an exit; the signal generation allows for detecting
// SIGHUP and SIGINT.
// Get the gen count of all reapable processes.
topic_set_t reaptopics{};
generation_list_t gens{};
gens.fill(invalid_generation);
job_iterator_t jobs;
while (auto *j = jobs.next()) {
for (const auto &proc : j->processes) {
if (auto mtopic = j->reap_topic_for_process(proc.get())) {
topic_t topic = *mtopic;
reaptopics.set(topic);
gens[topic] = std::min(gens[topic], proc->gens_[topic]);
/// Tracks whether or not we received SIGCHLD without checking all jobs (due to jobs under
/// construction), forcing a full waitpid loop.
static bool dirty_state = true;
static process_generation_count_t last_sigchld_count = -1;
// If the last time that we received a SIGCHLD we did not waitpid all jobs, we cannot early out.
if (!dirty_state && last_sigchld_count == s_sigchld_generation_cnt) {
// If we have foreground jobs, we need to block on them below
if (!block_on_fg) {
// We can assume that no children have exited and that all waitpid calls with
// WNOHANG below will confirm that.
return true;
reaptopics.set(topic_t::sighupint);
gens[topic_t::sighupint] =
std::min(gens[topic_t::sighupint], proc->gens_[topic_t::sighupint]);
}
}
}
last_sigchld_count = s_sigchld_generation_cnt;
bool jobs_skipped = false;
bool has_error = false;
job_t *job_fg = nullptr;
if (reaptopics.none()) {
// No reapable processes, nothing to wait for.
return;
}
// Reap only processes belonging to fully-constructed jobs to prevent reaping of processes
// before others in the same process group have a chance to join their pgrp.
job_iterator_t jobs;
while (auto j = jobs.next()) {
// (A job can have pgrp INVALID_PID if it consists solely of builtins that perform no IO)
if (j->pgid == INVALID_PID || !j->is_constructed()) {
debug(5, "Skipping wait on incomplete job %d (%ls)", j->job_id, j->preview().c_str());
jobs_skipped = true;
continue;
}
// Now check for changes, optionally waiting.
auto changed_topics = topic_monitor_t::principal().check(&gens, reaptopics, block_ok);
if (changed_topics.none()) return;
if (j != job_fg && j->is_foreground() && !j->is_stopped() && !j->is_completed()) {
// Ensure that we don't have multiple fully constructed foreground jobs.
assert((!job_fg || !job_fg->job_chain_is_fully_constructed() ||
!j->job_chain_is_fully_constructed()) &&
"More than one active, fully-constructed foreground job!");
job_fg = j;
}
// We got some changes. Since we last checked we received SIGCHLD, and or HUP/INT.
// Update the hup/int generations and reap any reapable processes.
jobs.reset();
while (auto *j = jobs.next()) {
for (const auto &proc : j->processes) {
if (auto mtopic = j->reap_topic_for_process(proc.get())) {
// Update the signal hup/int gen.
proc->gens_[topic_t::sighupint] = gens[topic_t::sighupint];
// Whether we will wait for uncompleted processes depends on the combination of
// `block_on_fg` and the nature of the process. Default is WNOHANG, but if foreground,
// constructed, not stopped, *and* block_on_fg is true, then no WNOHANG (i.e. "HANG").
int options = WUNTRACED | WNOHANG;
// We should never block twice in the same go, as `waitpid()' returning could mean one
// process completed or many, and there is a race condition when calling `waitpid()` after
// the process group exits having reaped all children and terminated the process group and
// when a subsequent call to `waitpid()` for the same process group returns immediately if
// that process group no longer exists. i.e. it's possible for all processes to have exited
// but the process group to remain momentarily valid, in which case calling `waitpid()`
// without WNOHANG can cause an infinite wait. Additionally, only wait on external jobs that
// spawned new process groups (i.e. JOB_CONTROL). We do not break or return on error as we
// wait on only one pgrp at a time and we need to check all pgrps before returning, but we
// never wait/block on fg processes after an error has been encountered to give ourselves
// (elsewhere) a chance to handle the fallout from process termination, etc.
if (!has_error && block_on_fg && j == job_fg) {
debug(4, "Waiting on processes from foreground job %d", job_fg->pgid);
options &= ~WNOHANG;
}
// Child jobs (produced via execution of functions) share job ids with their not-yet-
// fully-constructed parent jobs, so we have to wait on these by individual process id
// and not by the shared pgroup. End result is the same, but it just makes more calls
// to the kernel.
bool wait_by_process = !j->job_chain_is_fully_constructed();
// Firejail can result in jobs with pgroup 0, in which case we cannot wait by
// job id. See discussion in #5295.
if (j->pgid == 0) {
wait_by_process = true;
}
// Cygwin does some voodoo with regards to process management that I do not understand, but
// long story short, we cannot reap processes by their pgroup. The way child processes are
// launched under Cygwin is... weird, and outwardly they do not appear to retain information
// about their parent process when viewed in Task Manager. Waiting on processes by their
// pgroup results in never reaping any, so we just wait on them by process id instead.
if (is_cygwin()) {
wait_by_process = true;
}
// When waiting on processes individually in a pipeline, we need to enumerate in reverse
// order so that the first process we actually wait on (i.e. ~WNOHANG) is the last process
// in the IO chain, because that's the one that controls the lifetime of the foreground job
// - as long as it is still running, we are in the background and once it exits or is
// killed, all previous jobs in the IO pipeline must necessarily terminate as well.
auto process = j->processes.rbegin();
// waitpid(2) returns 1 process each time, we need to keep calling it until we've reaped all
// children of the pgrp in question or else we can't reset the dirty_state flag. In all
// cases, calling waitpid(2) is faster than potentially calling select_try() on a process
// that has exited, which will force us to wait the full timeout before coming back here and
// calling waitpid() again.
while (true) {
int status;
pid_t pid;
if (wait_by_process) {
// If the evaluation of a function resulted in the sharing of a pgroup between the
// real job and the job that shouldn't have been created as a separate job AND the
// parent job is still under construction (which is the case when continue_job() is
// first called on the child job during the recursive call to exec_job() before the
// parent job has been fully constructed), we need to call waitpid(2) on the
// individual processes of the child job instead of using a catch-all waitpid(2)
// call on the job's process group.
if (process == j->processes.rend()) {
break;
}
assert((*process)->pid != INVALID_PID && "Waiting by process on an invalid PID!");
if ((*process)->completed) {
// This process has already been waited on to completion
process++;
continue;
}
if ((options & WNOHANG) == 0) {
debug(4, "Waiting on individual process %d: %ls", (*process)->pid, (*process)->argv0());
} else {
debug(4, "waitpid with WNOHANG on individual process %d", (*process)->pid);
}
pid = waitpid((*process)->pid, &status, options);
process++;
} else {
// A negative PID passed in to `waitpid()` means wait on any child in that process
// group
pid = waitpid(-1 * j->pgid, &status, options);
}
if (pid > 0) {
// A child process has been reaped
debug(4, "Reaped PID %d", pid);
handle_child_status(pid, status);
// Always set WNOHANG (that is, don't hang). Otherwise we might wait on a non-stopped job
// that becomes stopped, but we don't refresh our view of the process state before
// calling waitpid(2) again here.
options |= WNOHANG;
} else if (pid == 0 || errno == ECHILD) {
// No killed/dead children in this particular process group
if (!wait_by_process) {
if ((options & WNOHANG) == 0) {
// This normally implies that the job has completed, but if we try to wait
// on a job that includes a process that changed its own group before we
// enter `waitpid`, we will be waiting forever. See #5596 for such a case.
wait_by_process = true;
continue;
if (proc->gens_[*mtopic] < gens[*mtopic]) {
// Potentially reapable. Update its gen count and try reaping it.
proc->gens_[*mtopic] = gens[*mtopic];
if (proc->internal_proc_) {
// Try reaping an internal process.
if (proc->internal_proc_->exited()) {
proc->status = proc->internal_proc_->get_status();
proc->completed = true;
}
} else if (proc->pid > 0) {
// Try reaping an external process.
int status = -1;
auto pid = waitpid(proc->pid, &status, WNOHANG | WUNTRACED);
if (pid > 0) {
assert(pid == proc->pid && "Unexpcted waitpid() return");
debug(4, "Reaped PID %d", pid);
handle_child_status(pid, status);
}
} else {
assert(0 && "Don't know how to reap this process");
}
break;
}
} else {
// pid < 0 indicates an error. One likely failure is ECHILD (no children), which is
// not an error and is ignored in the branch above. The other likely failure is
// EINTR, which means we got a signal, which is considered an error. We absolutely
// do not break or return on error, as we need to iterate over all constructed jobs
// but we only call waitpid for one pgrp at a time. We do bypass future waits in
// case of error, however.
has_error = true;
// Do not audibly complain on interrupt (see #5293)
if (errno != EINTR) {
wperror(L"waitpid in process_mark_finished_children");
}
break;
}
}
}
@@ -559,28 +450,6 @@ static bool process_mark_finished_children(bool block_on_fg) {
s_disowned_pids.erase(std::remove_if(s_disowned_pids.begin(), s_disowned_pids.end(),
[&status](pid_t pid) { return waitpid(pid, &status, WNOHANG) > 0; }),
s_disowned_pids.end());
// Yes, the below can be collapsed to a single line, but it's worth being explicit about it with
// the comments. Fret not, the compiler will optimize it. (It better!)
if (jobs_skipped) {
// We received SIGCHLD but were not able to definitely say whether or not all children were
// reaped.
dirty_state = true;
} else {
// We can safely assume that no SIGCHLD means we can just return next time around
dirty_state = false;
}
return !has_error;
}
/// This is called from a signal handler. The signal is always SIGCHLD.
void job_handle_signal(int signal, siginfo_t *info, void *context) {
UNUSED(signal);
UNUSED(info);
UNUSED(context);
// This is the only place that this generation count is modified. It's OK if it overflows.
s_sigchld_generation_cnt += 1;
}
/// Given a command like "cat file", truncate it to a reasonable length.
@@ -1053,15 +922,20 @@ void job_t::continue_job(bool send_sigcont) {
}
}
if (is_foreground()) {
if (is_completed()) {
// Set $status only if we are in the foreground and the last process in the job has
// finished and is not a short-circuited builtin.
auto &p = processes.back();
if ((WIFEXITED(p->status) || WIFSIGNALED(p->status)) && p->pid) {
int status = proc_format_status(p->status);
proc_set_last_status(get_flag(job_flag_t::NEGATE) ? !status : status);
}
if (is_foreground() && is_completed()) {
// Set $status only if we are in the foreground and the last process in the job has
// finished and is not a short-circuited builtin.
bool negate = get_flag(job_flag_t::NEGATE);
auto &p = processes.back();
if (p->internal_proc_) {
// Here the status is synthetic - not associated with a real exited process.
// TODO: clean this up, we shouldn't store the process's exit status in an unparsed
// state.
int status = p->status;
proc_set_last_status(negate ? !status : status);
} else if ((WIFEXITED(p->status) || WIFSIGNALED(p->status)) && p->pid) {
int status = proc_format_status(p->status);
proc_set_last_status(negate ? !status : status);
}
}
}

View File

@@ -20,6 +20,7 @@
#include "io.h"
#include "parse_tree.h"
#include "tnode.h"
#include "topic_monitor.h"
/// Types of processes.
enum process_type_t {
@@ -41,6 +42,28 @@ enum {
JOB_CONTROL_NONE,
};
/// A structure representing a "process" internal to fish. This is backed by a pthread instead of a
/// separate process.
class internal_proc_t {
/// Whether the process has exited.
std::atomic<bool> exited_{};
/// If the process has exited, its status code.
std::atomic<int> status_{};
public:
/// \return if this process has exited.
bool exited() const { return exited_.load(std::memory_order_relaxed); }
/// Mark this process as exited, with the given status.
void mark_exited(int status);
int get_status() const {
assert(exited() && "Process is not exited");
return status_.load(std::memory_order_acquire);
}
};
/// A structure representing a single fish process. Contains variables for tracking process state
/// and the process argument list. Actually, a fish process can be either a regular external
/// process, an internal builtin which may or may not spawn a fake IO process during execution, a
@@ -112,10 +135,23 @@ class process_t {
void set_io_chain(const io_chain_t &chain) { this->process_io_chain = chain; }
/// Store the current topic generations. That is, right before the process is launched, record
/// the generations of all topics; then we can tell which generation values have changed after
/// launch. This helps us avoid spurious waitpid calls.
void check_generations_before_launch();
/// Actual command to pass to exec in case of EXTERNAL or INTERNAL_EXEC.
wcstring actual_cmd;
/// Generation counts for reaping.
generation_list_t gens_{};
/// Process ID
pid_t pid{0};
/// If we are an "internal process," that process.
std::shared_ptr<internal_proc_t> internal_proc_{};
/// File descriptor that pipe output should bind to.
int pipe_write_fd{0};
/// True if process has completed.
@@ -153,6 +189,13 @@ enum class job_flag_t {
JOB_CONTROL,
/// Whether the job wants to own the terminal when in the foreground.
TERMINAL,
JOB_FLAG_COUNT
};
template <>
struct enum_info_t<job_flag_t> {
static constexpr auto count = job_flag_t::JOB_FLAG_COUNT;
};
typedef int job_id_t;
@@ -193,6 +236,31 @@ class job_t {
/// Sets the command.
void set_command(const wcstring &cmd) { command_str = cmd; }
/// \return whether it is OK to reap a given process. Sometimes we want to defer reaping a
/// process if it is the group leader and the job is not yet constructed, because then we might
/// also reap the process group and then we cannot add new processes to the group.
bool can_reap(const process_t *p) const {
// Internal processes can always be reaped.
if (p->internal_proc_) {
return true;
} else if (p->pid <= 0) {
// Can't reap without a pid.
return false;
} else if (!is_constructed() && pgid > 0 && p->pid == pgid) {
// p is the the group leader in an under-construction job.
return false;
} else {
return true;
}
}
/// \returns the reap topic for a process, which describes the manner in which we are reaped. A
/// none returns means don't reap, or perhaps defer reaping.
maybe_t<topic_t> reap_topic_for_process(const process_t *p) const {
if (p->completed || !can_reap(p)) return none();
return p->internal_proc_ ? topic_t::internal_exit : topic_t::sigchld;
}
/// Returns a truncated version of the job string. Used when a message has already been emitted
/// containing the full job string and job id, but using the job id alone would be confusing
/// due to reuse of freed job ids. Prevents overloading the debug comments with the full,
@@ -352,9 +420,6 @@ int proc_get_last_status();
/// \param interactive whether interactive jobs should be reaped as well
bool job_reap(bool interactive);
/// Signal handler for SIGCHLD. Mark any processes with relevant information.
void job_handle_signal(int signal, siginfo_t *info, void *con);
/// Mark a process as failed to execute (and therefore completed).
void job_mark_process_as_failed(const std::shared_ptr<job_t> &job, const process_t *p);

View File

@@ -88,3 +88,23 @@ maybe_t<dup2_list_t> dup2_list_t::resolve_chain(const io_chain_t &io_chain) {
}
return {std::move(result)};
}
int dup2_list_t::fd_for_target_fd(int target) const {
// Paranoia.
if (target < 0) {
return target;
}
// Note we can simply walk our action list backwards, looking for src -> target dups.
int cursor = target;
for (auto iter = actions_.rbegin(); iter != actions_.rend(); ++iter) {
if (iter->target == cursor) {
// cursor is replaced by iter->src
cursor = iter->src;
} else if (iter->src == cursor && iter->target < 0) {
// cursor is closed.
cursor = -1;
break;
}
}
return cursor;
}

View File

@@ -60,6 +60,11 @@ class dup2_list_t {
/// The result contains the list of fd actions (dup2 and close), as well as the list
/// of fds opened.
static maybe_t<dup2_list_t> resolve_chain(const io_chain_t &);
/// \return the fd ultimately dup'd to a target fd, or -1 if the target is closed.
/// For example, if target fd is 1, and we have a dup2 chain 5->3 and 3->1, then we will return 5.
/// If the target is not referenced in the chain, returns target.
int fd_for_target_fd(int target) const;
};
#endif

View File

@@ -15,6 +15,7 @@
#include "parser.h"
#include "proc.h"
#include "reader.h"
#include "topic_monitor.h"
#include "wutil.h" // IWYU pragma: keep
/// Struct describing an entry for the lookup table used to convert between signal names and signal
@@ -230,6 +231,7 @@ static void handle_hup(int sig, siginfo_t *info, void *context) {
} else {
reader_exit(1, 1);
}
topic_monitor_t::principal().post(topic_t::sighupint);
}
/// Handle sigterm. The only thing we do is restore the front process ID, then die.
@@ -248,6 +250,7 @@ static void handle_int(int sig, siginfo_t *info, void *context) {
if (reraise_if_forked_child(sig)) return;
reader_handle_sigint();
default_handler(sig, info, context);
topic_monitor_t::principal().post(topic_t::sighupint);
}
/// Non-interactive ^C handler.
@@ -260,8 +263,8 @@ static void handle_int_notinteractive(int sig, siginfo_t *info, void *context) {
/// sigchld handler. Does notification and calls the handler in proc.c.
static void handle_chld(int sig, siginfo_t *info, void *context) {
if (reraise_if_forked_child(sig)) return;
job_handle_signal(sig, info, context);
default_handler(sig, info, context);
topic_monitor_t::principal().post(topic_t::sigchld);
}
// We have a sigalarm handler that does nothing. This is used in the signal torture test, to verify

114
src/topic_monitor.cpp Normal file
View File

@@ -0,0 +1,114 @@
#include "config.h" // IWYU pragma: keep
#include "limits.h"
#include "topic_monitor.h"
#include "wutil.h"
#include <unistd.h>
/// Implementation of the principal monitor. This uses new (and leaks) to avoid registering a
/// pointless at-exit handler for the dtor.
static topic_monitor_t *const s_principal = new topic_monitor_t();
topic_monitor_t &topic_monitor_t::principal() {
// Do not attempt to move s_principal to a function-level static, it needs to be accessed from a
// signal handler so it must not be lazily created.
return *s_principal;
}
topic_monitor_t::topic_monitor_t() {
// Set up our pipes. Assert it succeeds.
auto pipes = make_autoclose_pipes({});
assert(pipes.has_value() && "Failed to make pubsub pipes");
pipes_ = pipes.acquire();
// Make sure that our write side doesn't block, else we risk hanging in a signal handler.
// The read end must block to avoid spinning in await.
DIE_ON_FAILURE(make_fd_nonblocking(pipes_.write.fd()));
}
topic_monitor_t::~topic_monitor_t() = default;
void topic_monitor_t::post(topic_t topic) {
// Beware, we may be in a signal handler!
// Atomically update the pending topics.
auto rawtopics = topic_set_t{topic}.to_raw();
auto oldtopics = pending_updates_.fetch_or(rawtopics, std::memory_order_relaxed);
if ((oldtopics & rawtopics) == rawtopics) {
// No new bits were set.
return;
}
// Ok, we changed one or more bits. Ensure the topic change is visible, and announce the change
// by writing a byte to the pipe.
std::atomic_thread_fence(std::memory_order_release);
ssize_t ret;
do {
// write() is async signal safe.
const uint8_t v = 0;
ret = write(pipes_.write.fd(), &v, sizeof v);
} while (ret < 0 && errno == EINTR);
// Ignore EAGAIN and other errors (which conceivably could occur during shutdown).
}
void topic_monitor_t::await_metagen(generation_t mgen) {
// Fast check of the metagen before taking the lock. If it's changed we're done.
if (mgen != current_metagen()) return;
// Take the lock (which may take a long time) and then check again.
std::unique_lock<std::mutex> locker{wait_queue_lock_};
if (mgen != current_metagen()) return;
// Our metagen hasn't changed. Push our metagen onto the queue, then wait until we're the
// lowest. If multiple waiters are the lowest, then anyone can be the observer.
// Note the reason for picking the lowest metagen is to avoid a priority inversion where a lower
// metagen (therefore someone who should see changes) is blocked waiting for a higher metagen
// (who has already seen the changes).
wait_queue_.push(mgen);
while (wait_queue_.top() != mgen) {
wait_queue_notifier_.wait(locker);
}
wait_queue_.pop();
// We now have the lowest metagen in the wait queue. Notice we still hold the lock.
// Read until the metagen changes. It may already have changed.
// Note because changes are coalesced, we can read a lot, potentially draining the pipe.
while (mgen == current_metagen()) {
uint8_t ignored[PIPE_BUF];
(void)read(pipes_.read.fd(), ignored, sizeof ignored);
}
// Release the lock and wake up the remaining waiters.
locker.unlock();
wait_queue_notifier_.notify_all();
}
topic_set_t topic_monitor_t::check(generation_list_t *gens, topic_set_t topics, bool wait) {
if (topics.none()) return topics;
topic_set_t changed{};
for (;;) {
// Load the topic list and see if anything has changed.
generation_list_t current = updated_gens();
for (topic_t topic : topic_iter_t{}) {
if (topics.get(topic)) {
assert(gens->at(topic) <= current.at(topic) &&
"Incoming gen count exceeded published count");
if (gens->at(topic) < current.at(topic)) {
gens->at(topic) = current.at(topic);
changed.set(topic);
}
}
}
// If we're not waiting, or something changed, then we're done.
if (!wait || changed.any()) {
break;
}
// Try again. Note that we use the metagen corresponding to the topic list we just
// inspected, not the current one (which may have updates since we checked).
await_metagen(metagen_for(current));
}
return changed;
}

167
src/topic_monitor.h Normal file
View File

@@ -0,0 +1,167 @@
#ifndef FISH_TOPIC_MONITOR_H
#define FISH_TOPIC_MONITOR_H
#include "common.h"
#include "enum_set.h"
#include "io.h"
#include <array>
#include <atomic>
#include <bitset>
#include <condition_variable>
#include <limits>
#include <numeric>
#include <queue>
/** Topic monitoring support. Topics are conceptually "a thing that can happen." For example,
delivery of a SIGINT, a child process exits, etc. It is possible to post to a topic, which means
that that thing happened.
Associated with each topic is a current generation, which is a 64 bit value. When you query a
topic, you get back a generation. If on the next query the generation has increased, then it
indicates someone posted to the topic.
For example, if you are monitoring a child process, you can query the sigchld topic. If it has
increased since your last query, it is possible that your child process has exited.
Topic postings may be coalesced. That is there may be two posts to a given topic, yet the
generation only increases by 1. The only guarantee is that after a topic post, the current
generation value is larger than any value previously queried.
Tying this all together is the topic_monitor_t. This provides the current topic generations, and
also provides the ability to perform a blocking wait for any topic to change in a particular topic
set. This is the real power of topics: you can wait for a sigchld signal OR a thread exit.
*/
/// The list of topics that may be observed.
enum class topic_t : uint8_t {
sigchld, // Corresponds to SIGCHLD signal.
sighupint, // Corresponds to both SIGHUP and SIGINT signals.
internal_exit, // Corresponds to an internal process exit.
COUNT
};
/// Allow enum_iter to be used.
template <>
struct enum_info_t<topic_t> {
static constexpr auto count = topic_t::COUNT;
};
/// Set of topics.
using topic_set_t = enum_set_t<topic_t>;
/// Counting iterator for topics.
using topic_iter_t = enum_iter_t<topic_t>;
/// A generation is a counter incremented every time the value of a topic changes.
/// It is 64 bit so it will never wrap.
using generation_t = uint64_t;
/// A generation value which is guaranteed to never be set and be larger than any valid generation.
constexpr generation_t invalid_generation = std::numeric_limits<generation_t>::max();
/// List of generation values, indexed by topics.
/// The initial value of a generation is always 0.
using generation_list_t = enum_array_t<generation_t, topic_t>;
/// Teh topic monitor class. This permits querying the current generation values for topics,
/// optionally blocking until they increase.
class topic_monitor_t {
private:
using topic_set_raw_t = uint8_t;
static_assert(sizeof(topic_set_raw_t) * CHAR_BIT >= enum_count<topic_t>(),
"topic_set_raw is too small");
/// The current topic generation list, protected by a mutex. Note this may be opportunistically
/// updated at the point it is queried.
owning_lock<generation_list_t> current_gen_{{}};
/// The set of topics which have pending increments.
/// This is managed via atomics.
std::atomic<topic_set_raw_t> pending_updates_{};
/// When a topic set is queried in a blocking way, the waiters are put into a queue. The waiter
/// with the smallest metagen is responsible for announcing the change to the rest of the
/// waiters. (The metagen is just the sum of the current generations.) Note that this is a
/// max-heap that defaults to std::less; by using std::greater it becomes a min heap. This is
/// protected by wait_queue_lock_.
std::priority_queue<generation_t, std::vector<generation_t>, std::greater<generation_t>>
wait_queue_;
/// Mutex guarding the wait queue.
std::mutex wait_queue_lock_{};
/// Condition variable for broadcasting notifications.
std::condition_variable wait_queue_notifier_{};
/// Pipes used to communicate changes from the signal handler.
autoclose_pipes_t pipes_;
/// \return the metagen for a topic generation list.
/// The metagen is simply the sum of topic generations. Note it is monotone.
static inline generation_t metagen_for(const generation_list_t &lst) {
return std::accumulate(lst.begin(), lst.end(), generation_t{0});
}
/// Wait for the current metagen to become different from \p gen.
/// If it is already different, return immediately.
void await_metagen(generation_t gen);
/// Return the current generation list, opportunistically applying any pending updates.
generation_list_t updated_gens() {
auto current_gens = current_gen_.acquire();
// Atomically acquire the pending updates, swapping in 0.
// If there are no pending updates (likely), just return.
// Otherwise CAS in 0 and update our topics.
const auto relaxed = std::memory_order_relaxed;
topic_set_raw_t raw;
bool cas_success;
do {
raw = pending_updates_.load(relaxed);
if (raw == 0) return *current_gens;
cas_success = pending_updates_.compare_exchange_weak(raw, 0, relaxed, relaxed);
} while (!cas_success);
// Update the current generation with our topics and return it.
auto topics = topic_set_t::from_raw(raw);
for (topic_t topic : topic_iter_t{}) {
current_gens->at(topic) += topics.get(topic) ? 1 : 0;
}
return *current_gens;
}
/// \return the metagen for the current topic generation list.
inline generation_t current_metagen() { return metagen_for(updated_gens()); }
public:
topic_monitor_t();
~topic_monitor_t();
/// topic_monitors should not be copied, and there should be no reason to move one.
void operator=(const topic_monitor_t &) = delete;
topic_monitor_t(const topic_monitor_t &) = delete;
void operator=(topic_monitor_t &&) = delete;
topic_monitor_t(topic_monitor_t &&) = delete;
/// The principal topic_monitor. This may be fetched from a signal handler.
static topic_monitor_t &principal();
/// Post to a topic, potentially from a signal handler.
void post(topic_t topic);
/// Access the current generations.
generation_list_t current_generations() { return updated_gens(); }
/// Access the generation for a topic.
generation_t generation_for_topic(topic_t topic) { return current_generations().at(topic); }
/// See if for any topic (specified in \p topics) has changed from the values in the generation
/// list \p gens. If \p wait is set, then wait if there are no changes; otherwise return
/// immediately.
/// \return the set of topics that changed, updating the generation list \p gens.
topic_set_t check(generation_list_t *gens, topic_set_t topics, bool wait);
};
#endif