From 6ba0d4c88af0e3b34f92763feda6f555c0ce74d8 Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Sat, 2 Feb 2019 17:53:40 -0800 Subject: [PATCH] Revert io_bufferfill_t stack This reverts commit 88dc484858f708ef5ac14c928fc5cf7c3192397a onwards. --- CMakeLists.txt | 2 +- Makefile.in | 2 +- share/functions/psub.fish | 5 +- src/common.h | 3 - src/exec.cpp | 268 ++++++++++++++++++++++++-------------- src/exec.h | 4 + src/fish.cpp | 2 +- src/fish_tests.cpp | 53 +------- src/io.cpp | 215 +++++++++--------------------- src/io.h | 171 ++++++++---------------- src/iothread.cpp | 64 ++------- src/iothread.h | 8 +- src/postfork.cpp | 205 ++++++++++++++++++++++++++--- src/postfork.h | 10 +- src/proc.cpp | 144 +++++++++++++++++++- src/redirection.cpp | 92 ------------- src/redirection.h | 64 --------- 17 files changed, 647 insertions(+), 665 deletions(-) delete mode 100644 src/redirection.cpp delete mode 100644 src/redirection.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 809f24162..882888c7a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -77,7 +77,7 @@ SET(FISH_SRCS src/postfork.cpp src/proc.cpp src/reader.cpp src/sanity.cpp src/screen.cpp src/signal.cpp src/tinyexpr.cpp src/tnode.cpp src/tokenizer.cpp src/utf8.cpp src/util.cpp src/wcstringutil.cpp src/wgetopt.cpp src/wildcard.cpp src/wutil.cpp - src/future_feature_flags.cpp src/redirection.cpp + src/future_feature_flags.cpp ) # Header files are just globbed. diff --git a/Makefile.in b/Makefile.in index 059e05847..aa5e0e2ee 100644 --- a/Makefile.in +++ b/Makefile.in @@ -119,7 +119,7 @@ FISH_OBJS := obj/autoload.o obj/builtin.o obj/builtin_bg.o obj/builtin_bind.o ob obj/parser_keywords.o obj/path.o obj/postfork.o obj/proc.o obj/reader.o \ obj/sanity.o obj/screen.o obj/signal.o obj/tinyexpr.o obj/tokenizer.o obj/tnode.o obj/utf8.o \ obj/util.o obj/wcstringutil.o obj/wgetopt.o obj/wildcard.o obj/wutil.o \ - obj/future_feature_flags.o obj/redirection.o + obj/future_feature_flags.o FISH_INDENT_OBJS := obj/fish_indent.o obj/print_help.o $(FISH_OBJS) diff --git a/share/functions/psub.fish b/share/functions/psub.fish index 8c8a7eac8..b764d6f35 100644 --- a/share/functions/psub.fish +++ b/share/functions/psub.fish @@ -29,10 +29,7 @@ function psub --description "Read from stdin into a file and output the filename or return set filename $dirname/psub.fifo"$_flag_suffix" mkfifo $filename - # Note that if we were to do the obvious `cat >$filename &`, we would deadlock - # because $filename may be opened before the fork. Use tee to ensure it is opened - # after the fork. - tee $filename >/dev/null & + cat >$filename & else if test -z "$_flag_suffix" set filename (mktemp $tmpdir/.psub.XXXXXXXXXX) cat >$filename diff --git a/src/common.h b/src/common.h index 5fa2119ba..8b45a808c 100644 --- a/src/common.h +++ b/src/common.h @@ -786,9 +786,6 @@ class autoclose_fd_t { fd_ = fd; } - // \return if this has a valid fd. - bool valid() const { return fd_ >= 0; } - autoclose_fd_t(const autoclose_fd_t &) = delete; void operator=(const autoclose_fd_t &) = delete; autoclose_fd_t(autoclose_fd_t &&rhs) : fd_(rhs.fd_) { rhs.fd_ = -1; } diff --git a/src/exec.cpp b/src/exec.cpp index eb1a88259..75af77921 100644 --- a/src/exec.cpp +++ b/src/exec.cpp @@ -39,7 +39,6 @@ #include "postfork.h" #include "proc.h" #include "reader.h" -#include "redirection.h" #include "signal.h" #include "wutil.h" // IWYU pragma: keep @@ -81,6 +80,25 @@ void exec_close(int fd) { } } +int exec_pipe(int fd[2]) { + ASSERT_IS_MAIN_THREAD(); + + int res; + while ((res = pipe(fd))) { + if (errno != EINTR) { + return res; // caller will call wperror + } + } + + debug(4, L"Created pipe using fds %d and %d", fd[0], fd[1]); + + // Pipes ought to be cloexec. Pipes are dup2'd the corresponding fds; the resulting fds are not + // cloexec. + set_cloexec(fd[0]); + set_cloexec(fd[1]); + return res; +} + /// Returns true if the redirection is a file redirection to a file other than /dev/null. static bool redirection_is_to_real_file(const io_data_t *io) { bool result = false; @@ -96,6 +114,18 @@ static bool redirection_is_to_real_file(const io_data_t *io) { return result; } +static bool chain_contains_redirection_to_real_file(const io_chain_t &io_chain) { + bool result = false; + for (size_t idx = 0; idx < io_chain.size(); idx++) { + const io_data_t *io = io_chain.at(idx).get(); + if (redirection_is_to_real_file(io)) { + result = true; + break; + } + } + return result; +} + /// Returns the interpreter for the specified script. Returns NULL if file is not a script with a /// shebang. char *get_interpreter(const char *command, char *interpreter, size_t buff_size) { @@ -227,8 +257,8 @@ static bool io_transmogrify(const io_chain_t &in_chain, io_chain_t *out_chain, switch (in->io_mode) { case io_mode_t::pipe: - case io_mode_t::bufferfill: case io_mode_t::fd: + case io_mode_t::buffer: case io_mode_t::close: { // These redirections don't need transmogrification. They can be passed through. out = in; @@ -299,9 +329,11 @@ void internal_exec_helper(parser_t &parser, parsed_source_ref_t parsed_source, t job_reap(false); } -// Returns whether we can use posix spawn for a given process in a given job. +// Returns whether we can use posix spawn for a given process in a given job. Per +// https://github.com/fish-shell/fish-shell/issues/364 , error handling for file redirections is too +// difficult with posix_spawn, so in that case we use fork/exec. // -// To avoid the race between the caller calling tcsetpgrp() and the client checking the +// Furthermore, to avoid the race between the caller calling tcsetpgrp() and the client checking the // foreground process group, we don't use posix_spawn if we're going to foreground the process. (If // we use fork(), we can call tcsetpgrp after the fork, before the exec, and avoid the race). static bool can_use_posix_spawn_for_job(const std::shared_ptr &job, @@ -315,7 +347,15 @@ static bool can_use_posix_spawn_for_job(const std::shared_ptr &job, return false; } } - return true; + + // Now see if we have a redirection involving a file. The only one we allow is /dev/null, which + // we assume will not fail. + bool result = true; + if (chain_contains_redirection_to_real_file(job->block_io_chain()) || + chain_contains_redirection_to_real_file(process->io_chain())) { + result = false; + } + return result; } void internal_exec(env_stack_t &vars, job_t *j, const io_chain_t &all_ios) { @@ -327,8 +367,7 @@ void internal_exec(env_stack_t &vars, job_t *j, const io_chain_t &all_ios) { // It's known to be wrong - for example, it means that redirections bound for subsequent // commands in the pipeline will apply to exec. However, using exec in a pipeline doesn't // really make sense, so I'm not trying to fix it here. - auto redirs = dup2_list_t::resolve_chain(all_ios); - if (redirs && !setup_child_process(0, *redirs)) { + if (!setup_child_process(0, all_ios)) { // Decrement SHLVL as we're removing ourselves from the shell "stack". auto shlvl_var = vars.get(L"SHLVL", ENV_GLOBAL | ENV_EXPORT); wcstring shlvl_str = L"0"; @@ -365,7 +404,7 @@ static void on_process_created(const std::shared_ptr &j, pid_t child_pid) /// Call fork() as part of executing a process \p p in a job \j. Execute \p child_action in the /// context of the child. Returns true if fork succeeded, false if fork failed. static bool fork_child_for_process(const std::shared_ptr &job, process_t *p, - const dup2_list_t &dup2s, bool drain_threads, + const io_chain_t &io_chain, bool drain_threads, const char *fork_type, const std::function &child_action) { pid_t pid = execute_fork(drain_threads); @@ -374,7 +413,7 @@ static bool fork_child_for_process(const std::shared_ptr &job, process_t // stdout and stderr, and then exit. p->pid = getpid(); child_set_group(job.get(), p); - setup_child_process(p, dup2s); + setup_child_process(p, io_chain); child_action(); DIE("Child process returned control to fork_child lambda!"); } @@ -405,12 +444,12 @@ static bool exec_internal_builtin_proc(parser_t &parser, const std::shared_ptrtype == INTERNAL_BUILTIN && "Process must be a builtin"); int local_builtin_stdin = STDIN_FILENO; - autoclose_fd_t locally_opened_stdin{}; + bool close_stdin = false; // If this is the first process, check the io redirections and see where we should // be reading from. if (pipe_read) { - local_builtin_stdin = pipe_read->pipe_fd(); + local_builtin_stdin = pipe_read->pipe_fd[0]; } else if (const auto in = proc_io_chain.get_io_for_fd(STDIN_FILENO)) { switch (in->io_mode) { case io_mode_t::fd: { @@ -429,20 +468,20 @@ static bool exec_internal_builtin_proc(parser_t &parser, const std::shared_ptr(in.get()); - if (in_pipe->fd == STDIN_FILENO) { - local_builtin_stdin = in_pipe->pipe_fd(); - } + local_builtin_stdin = in_pipe->pipe_fd[0]; break; } case io_mode_t::file: { + // Do not set CLO_EXEC because child needs access. const io_file_t *in_file = static_cast(in.get()); - locally_opened_stdin = - autoclose_fd_t{open(in_file->filename_cstr, in_file->flags, OPEN_MASK)}; - if (!locally_opened_stdin.valid()) { + local_builtin_stdin = open(in_file->filename_cstr, in_file->flags, OPEN_MASK); + if (local_builtin_stdin == -1) { debug(1, FILE_ERROR, in_file->filename_cstr); wperror(L"open"); + } else { + close_stdin = true; } - local_builtin_stdin = locally_opened_stdin.fd(); + break; } case io_mode_t::close: { @@ -498,6 +537,10 @@ static bool exec_internal_builtin_proc(parser_t &parser, const std::shared_ptrset_flag(job_flag_t::FOREGROUND, fg); + // If stdin has been redirected, close the redirection stream. + if (close_stdin) { + exec_close(local_builtin_stdin); + } return true; // "success" } @@ -525,11 +568,7 @@ static bool handle_builtin_output(const std::shared_ptr &j, process_t *p, if (!must_fork && p->is_last_in_job) { // We are handling reads directly in the main loop. Note that we may still end // up forking. - const bool stdout_is_bufferfill = - (stdout_io && stdout_io->io_mode == io_mode_t::bufferfill); - const std::shared_ptr stdout_buffer = - stdout_is_bufferfill ? static_cast(stdout_io.get())->buffer() - : nullptr; + const bool stdout_is_to_buffer = stdout_io && stdout_io->io_mode == io_mode_t::buffer; const bool no_stdout_output = stdout_stream.empty(); const bool no_stderr_output = stderr_stream.empty(); const bool stdout_discarded = stdout_stream.buffer().discarded(); @@ -539,7 +578,7 @@ static bool handle_builtin_output(const std::shared_ptr &j, process_t *p, // need to fork or even output anything. debug(4, L"Skipping fork: no output for internal builtin '%ls'", p->argv0()); fork_was_skipped = true; - } else if (no_stderr_output && stdout_buffer) { + } else if (no_stderr_output && stdout_is_to_buffer) { // The builtin produced no stderr, and its stdout is going to an // internal buffer. There is no need to fork. This helps out the // performance quite a bit in complex completion code. @@ -551,7 +590,8 @@ static bool handle_builtin_output(const std::shared_ptr &j, process_t *p, // also produce stderr. debug(4, L"Skipping fork: buffered output for internal builtin '%ls'", p->argv0()); - stdout_buffer->append_from_stream(stdout_stream); + io_buffer_t *io_buffer = static_cast(stdout_io.get()); + io_buffer->append_from_stream(stdout_stream); fork_was_skipped = true; } else if (stdout_io.get() == NULL && stderr_io.get() == NULL) { // We are writing to normal stdout and stderr. Just do it - no need to fork. @@ -594,15 +634,9 @@ static bool handle_builtin_output(const std::shared_ptr &j, process_t *p, const char *errbuff = errbuff_str.data(); size_t errbuff_len = errbuff_str.size(); - // Resolve our IO chain to a sequence of dup2s. - auto dup2s = dup2_list_t::resolve_chain(*io_chain); - if (!dup2s) { - return false; - } - fflush(stdout); fflush(stderr); - if (!fork_child_for_process(j, p, *dup2s, false, "internal builtin", [&] { + if (!fork_child_for_process(j, p, *io_chain, false, "internal builtin", [&] { do_builtin_io(outbuff, outbuff_len, errbuff, errbuff_len); exit_without_destructors(p->status); })) { @@ -621,11 +655,6 @@ static bool exec_external_command(env_stack_t &vars, const std::shared_ptr argv_array; convert_wide_array_to_narrow(p->get_argv_array(), &argv_array); - // Convert our IO chain to a dup2 sequence. - auto dup2s = dup2_list_t::resolve_chain(proc_io_chain); - if (! dup2s) - return false; - // Ensure that stdin is blocking before we hand it off (see issue #176). It's a // little strange that we only do this with stdin and not with stdout or stderr. // However in practice, setting or clearing O_NONBLOCK on stdin also sets it for the @@ -649,7 +678,8 @@ static bool exec_external_command(env_stack_t &vars, const std::shared_ptr "Unexpected process type"); // Create an output buffer if we're piping to another process. - shared_ptr block_output_bufferfill{}; + shared_ptr block_output_io_buffer{}; if (!p->is_last_in_job) { // Be careful to handle failure, e.g. too many open fds. - block_output_bufferfill = io_bufferfill_t::create(user_ios); - if (!block_output_bufferfill) { + block_output_io_buffer = io_buffer_t::create(STDOUT_FILENO, user_ios); + if (!block_output_io_buffer) { job_mark_process_as_failed(j, p); return false; + } else { + // This looks sketchy, because we're adding this io buffer locally - they + // aren't in the process or job redirection list. Therefore select_try won't + // be able to read them. However we call block_output_io_buffer->read() + // below, which reads until EOF. So there's no need to select on this. + io_chain.push_back(block_output_io_buffer); } - // Teach the job about its bufferfill, and add it to our io chain. - io_chain.push_back(block_output_bufferfill); } if (p->type == INTERNAL_FUNCTION) { @@ -768,8 +802,10 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr int status = proc_get_last_status(); - // If we have a block output buffer, populate it now. - if (!block_output_bufferfill) { + // Handle output from a block or function. This usually means do nothing, but in the + // case of pipes, we have to buffer such io, since otherwise the internal pipe + // buffer might overflow. + if (!block_output_io_buffer.get()) { // No buffer, so we exit directly. This means we have to manually set the exit // status. if (p->is_last_in_job) { @@ -778,28 +814,21 @@ static bool exec_block_or_func_process(parser_t &parser, std::shared_ptr p->completed = 1; return true; } - assert(block_output_bufferfill && "Must have a block output bufferfiller"); - // Remove our write pipe and forget it. This may close the pipe, unless another thread has - // claimed it (background write) or another process has inherited it. - io_chain.remove(block_output_bufferfill); - auto block_output_buffer = io_bufferfill_t::finish(std::move(block_output_bufferfill)); + // Here we must have a non-NULL block_output_io_buffer. + assert(block_output_io_buffer.get() != NULL); + io_chain.remove(block_output_io_buffer); + block_output_io_buffer->read(); - // Resolve our IO chain to a sequence of dup2s. - auto dup2s = dup2_list_t::resolve_chain(io_chain); - if (!dup2s) { - return false; - } - - const std::string buffer_contents = block_output_buffer->buffer().newline_serialized(); + const std::string buffer_contents = block_output_io_buffer->buffer().newline_serialized(); const char *buffer = buffer_contents.data(); size_t count = buffer_contents.size(); if (count > 0) { // We don't have to drain threads here because our child process is simple. const char *fork_reason = p->type == INTERNAL_BLOCK_NODE ? "internal block io" : "internal function io"; - if (!fork_child_for_process(j, p, *dup2s, false, fork_reason, [&] { - exec_write_and_exit(STDOUT_FILENO, buffer, count, status); + if (!fork_child_for_process(j, p, io_chain, false, fork_reason, [&] { + exec_write_and_exit(block_output_io_buffer->fd, buffer, count, status); })) { return false; } @@ -819,28 +848,19 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr< autoclose_fd_t pipe_current_read, autoclose_fd_t *out_pipe_next_read, const io_chain_t &all_ios, size_t stdout_read_limit) { - // The pipe this command will write to (if any). - shared_ptr pipe_write; - // The pipe this command will read from (if any). - shared_ptr pipe_read; + // The IO chain for this process. It starts with the block IO, then pipes, and then gets any + // from the process. + io_chain_t process_net_io_chain = j->block_io_chain(); - // See if we need a pipe for the next command. + // See if we need a pipe. const bool pipes_to_next_command = !p->is_last_in_job; - if (pipes_to_next_command) { - // Construct our pipes. - auto local_pipes = make_autoclose_pipes(all_ios); - if (!local_pipes) { - debug(1, PIPE_ERROR); - wperror(L"pipe"); - job_mark_process_as_failed(j, p); - return false; - } - pipe_write = std::make_shared(p->pipe_write_fd, false /* not input */, - std::move(local_pipes->write)); - *out_pipe_next_read = std::move(local_pipes->read); - } + // The write end of any pipe we create. + autoclose_fd_t pipe_current_write{}; + // The pipes the current process write to and read from. Unfortunately these can't be just + // allocated on the stack, since j->io wants shared_ptr. + // // The write pipe (destined for stdout) needs to occur before redirections. For example, // with a redirection like this: // @@ -868,10 +888,12 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr< // // which depends on the redirection being evaluated before the pipe. So the write end of the // pipe comes first, the read pipe of the pipe comes last. See issue #966. + shared_ptr pipe_write; + shared_ptr pipe_read; - // The IO chain for this process. - io_chain_t process_net_io_chain = j->block_io_chain(); - if (pipe_write) { + // Write pipe goes first. + if (pipes_to_next_command) { + pipe_write.reset(new io_pipe_t(p->pipe_write_fd, false)); process_net_io_chain.push_back(pipe_write); } @@ -879,9 +901,10 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr< process_net_io_chain.append(p->io_chain()); // Read pipe goes last. - if (pipe_current_read.valid()) { - pipe_read = std::make_shared(STDIN_FILENO, true /* input */, - std::move(pipe_current_read)); + if (!p->is_first_in_job) { + pipe_read.reset(new io_pipe_t(STDIN_FILENO, true)); + // Record the current read in pipe_read. + pipe_read->pipe_fd[0] = pipe_current_read.fd(); process_net_io_chain.push_back(pipe_read); } @@ -899,6 +922,36 @@ static bool exec_process_in_job(parser_t &parser, process_t *p, std::shared_ptr< parser.vars().export_arr(); } + // Set up fds that will be used in the pipe. + if (pipes_to_next_command) { + // debug( 1, L"%ls|%ls" , p->argv[0], p->next->argv[0]); + int local_pipe[2] = {-1, -1}; + if (exec_pipe(local_pipe) == -1) { + debug(1, PIPE_ERROR); + wperror(L"pipe"); + job_mark_process_as_failed(j, p); + return false; + } + + // Ensure our pipe fds not conflict with any fd redirections. E.g. if the process is + // like 'cat <&5' then fd 5 must not be used by the pipe. + if (!pipe_avoid_conflicts_with_io_chain(local_pipe, all_ios)) { + // We failed. The pipes were closed for us. + wperror(L"dup"); + job_mark_process_as_failed(j, p); + return false; + } + + // This tells the redirection about the fds, but the redirection does not close them. + assert(local_pipe[0] >= 0); + assert(local_pipe[1] >= 0); + memcpy(pipe_write->pipe_fd, local_pipe, sizeof(int) * 2); + + // Record our pipes. + pipe_current_write.reset(local_pipe[1]); + out_pipe_next_read->reset(local_pipe[0]); + } + // Execute the process. switch (p->type) { case INTERNAL_FUNCTION: @@ -959,13 +1012,18 @@ bool exec_job(parser_t &parser, shared_ptr j) { } } + // Verify that all io_mode_t::buffers are output. We used to support a (single, hacked-in) + // magical input io_mode_t::buffer used by fish_pager, but now the claim is that there are no + // more clients and it is removed. This assertion double-checks that. size_t stdout_read_limit = 0; const io_chain_t all_ios = j->all_io_redirections(); - for (auto &io : all_ios) { - if ((io->io_mode == io_mode_t::bufferfill)) { - // The read limit is dictated by the last bufferfill. - const auto *bf = static_cast(io.get()); - stdout_read_limit = bf->buffer()->read_limit(); + for (size_t idx = 0; idx < all_ios.size(); idx++) { + const shared_ptr &io = all_ios.at(idx); + + if ((io->io_mode == io_mode_t::buffer)) { + io_buffer_t *io_buffer = static_cast(io.get()); + assert(!io_buffer->is_input); + stdout_read_limit = io_buffer->buffer().limit(); } } @@ -974,6 +1032,21 @@ bool exec_job(parser_t &parser, shared_ptr j) { DIE("this should be unreachable"); } + // We may have block IOs that conflict with fd redirections. For example, we may have a command + // with a redireciton like <&3; we may also have chosen 3 as the fd for our pipe. Ensure we have + // no conflicts. + for (const auto io : all_ios) { + if (io->io_mode == io_mode_t::buffer) { + auto *io_buffer = static_cast(io.get()); + if (!io_buffer->avoid_conflicts_with_io_chain(all_ios)) { + // We could not avoid conflicts, probably due to fd exhaustion. Mark an error. + exec_error = true; + job_mark_process_as_failed(j, j->processes.front().get()); + break; + } + } + } + // This loop loops over every process_t in the job, starting it as appropriate. This turns out // to be rather complex, since a process_t can be one of many rather different things. // @@ -1029,28 +1102,29 @@ static int exec_subshell_internal(const wcstring &cmd, parser_t &parser, wcstrin // IO buffer creation may fail (e.g. if we have too many open files to make a pipe), so this may // be null. - size_t read_limit = is_subcmd ? read_byte_limit : 0; - std::shared_ptr buffer; - if (auto bufferfill = io_bufferfill_t::create(io_chain_t{}, read_limit)) { + const shared_ptr io_buffer( + io_buffer_t::create(STDOUT_FILENO, io_chain_t(), is_subcmd ? read_byte_limit : 0)); + if (io_buffer.get() != NULL) { parser_t &parser = parser_t::principal_parser(); - if (parser.eval(cmd, io_chain_t{bufferfill}, SUBST) == 0) { + if (parser.eval(cmd, io_chain_t{io_buffer}, SUBST) == 0) { subcommand_status = proc_get_last_status(); } - buffer = io_bufferfill_t::finish(std::move(bufferfill)); + + io_buffer->read(); } - if (buffer && buffer->buffer().discarded()) subcommand_status = STATUS_READ_TOO_MUCH; + if (io_buffer->buffer().discarded()) subcommand_status = STATUS_READ_TOO_MUCH; // If the caller asked us to preserve the exit status, restore the old status. Otherwise set the // status of the subcommand. proc_set_last_status(apply_exit_status ? subcommand_status : prev_status); is_subshell = prev_subshell; - if (lst == NULL || !buffer) { + if (lst == NULL || io_buffer.get() == NULL) { return subcommand_status; } // Walk over all the elements. - for (const auto &elem : buffer->buffer().elements()) { + for (const auto &elem : io_buffer->buffer().elements()) { if (elem.is_explicitly_separated()) { // Just append this one. lst->push_back(str2wcstring(elem.contents)); diff --git a/src/exec.h b/src/exec.h index 63b78abc7..010de4654 100644 --- a/src/exec.h +++ b/src/exec.h @@ -31,6 +31,10 @@ int exec_subshell(const wcstring &cmd, parser_t &parser, bool preserve_exit_stat /// Loops over close until the syscall was run without being interrupted. void exec_close(int fd); +/// Call pipe(), and add resulting fds to open_fds, the list of opened file descriptors for pipes. +/// The pipes are marked CLO_EXEC. +int exec_pipe(int fd[2]); + /// Gets the interpreter for a given command. char *get_interpreter(const char *command, char *interpreter, size_t buff_size); diff --git a/src/fish.cpp b/src/fish.cpp index fc798bf4a..ceb49c053 100644 --- a/src/fish.cpp +++ b/src/fish.cpp @@ -383,7 +383,7 @@ int main(int argc, char **argv) { parser_t &parser = parser_t::principal_parser(); - const io_chain_t empty_ios {}; + const io_chain_t empty_ios; if (read_init(paths)) { // Stomp the exit status of any initialization commands (issue #635). proc_set_last_status(STATUS_CMD_OK); diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index cecb566f3..1760944a7 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -64,7 +64,6 @@ #include "path.h" #include "proc.h" #include "reader.h" -#include "redirection.h" #include "screen.h" #include "signal.h" #include "tnode.h" @@ -678,20 +677,6 @@ static void test_iothread() { max_achieved_thread_count); } -static void test_pthread() { - say(L"Testing pthreads"); - pthread_t result = {}; - int val = 3; - bool made = make_pthread(&result, [&val](){ - val += 2; - }); - do_test(made); - void *ignore = nullptr; - int ret = pthread_join(result, &ignore); - do_test(ret == 0); - do_test(val == 5); -} - static parser_test_error_bits_t detect_argument_errors(const wcstring &src) { parse_node_tree_t tree; if (!parse_tree_from_string(src, parse_flag_none, &tree, NULL, symbol_argument_list)) { @@ -934,7 +919,8 @@ static void test_parser() { } static void test_1_cancellation(const wchar_t *src) { - auto filler = io_bufferfill_t::create(io_chain_t{}); + shared_ptr out_buff(io_buffer_t::create(STDOUT_FILENO, io_chain_t())); + const io_chain_t io_chain{out_buff}; pthread_t thread = pthread_self(); double delay = 0.25 /* seconds */; iothread_perform([=]() { @@ -942,11 +928,11 @@ static void test_1_cancellation(const wchar_t *src) { usleep(delay * 1E6); pthread_kill(thread, SIGINT); }); - parser_t::principal_parser().eval(src, io_chain_t{filler}, TOP); - auto buffer = io_bufferfill_t::finish(std::move(filler)); - if (buffer->buffer().size() != 0) { + parser_t::principal_parser().eval(src, io_chain, TOP); + out_buff->read(); + if (out_buff->buffer().size() != 0) { err(L"Expected 0 bytes in out_buff, but instead found %lu bytes\n", - buffer->buffer().size()); + out_buff->buffer().size()); } iothread_drain_all(); } @@ -2353,31 +2339,6 @@ static void test_wcstod() { tod_test(L"nope", "nope"); } -static void test_dup2s() { - using std::make_shared; - io_chain_t chain; - chain.push_back(make_shared(17)); - chain.push_back(make_shared(3, 19, true)); - auto list = dup2_list_t::resolve_chain(chain); - do_test(list.has_value()); - do_test(list->get_actions().size() == 2); - - auto act1 = list->get_actions().at(0); - do_test(act1.src == 17); - do_test(act1.target == -1); - - auto act2 = list->get_actions().at(1); - do_test(act2.src == 19); - do_test(act2.target == 3); - - // Invalid files should fail to open. - // Suppress the debug() message. - scoped_push saved_debug_level(&debug_level, -1); - chain.push_back(make_shared(2, L"/definitely/not/a/valid/path/for/this/test", 0666)); - list = dup2_list_t::resolve_chain(chain); - do_test(!list.has_value()); -} - /// Testing colors. static void test_colors() { say(L"Testing colors"); @@ -5095,7 +5056,6 @@ int main(int argc, char **argv) { if (should_test_function("convert_nulls")) test_convert_nulls(); if (should_test_function("tok")) test_tokenizer(); if (should_test_function("iothread")) test_iothread(); - if (should_test_function("pthread")) test_pthread(); if (should_test_function("parser")) test_parser(); if (should_test_function("cancellation")) test_cancellation(); if (should_test_function("indents")) test_indents(); @@ -5111,7 +5071,6 @@ int main(int argc, char **argv) { if (should_test_function("abbreviations")) test_abbreviations(); if (should_test_function("test")) test_test(); if (should_test_function("wcstod")) test_wcstod(); - if (should_test_function("dup2s")) test_dup2s(); if (should_test_function("path")) test_path(); if (should_test_function("pager_navigation")) test_pager_navigation(); if (should_test_function("pager_layout")) test_pager_layout(); diff --git a/src/io.cpp b/src/io.cpp index 51ce05e89..5cc0b733c 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -11,7 +11,6 @@ #include "exec.h" #include "fallback.h" // IWYU pragma: keep #include "io.h" -#include "iothread.h" #include "wutil.h" // IWYU pragma: keep io_data_t::~io_data_t() = default; @@ -23,13 +22,16 @@ void io_fd_t::print() const { fwprintf(stderr, L"FD map %d -> %d\n", old_fd, fd) void io_file_t::print() const { fwprintf(stderr, L"file (%s)\n", filename_cstr); } void io_pipe_t::print() const { - fwprintf(stderr, L"pipe {%d} (input: %s)\n", pipe_fd(), is_input_ ? "yes" : "no"); + fwprintf(stderr, L"pipe {%d, %d} (input: %s)\n", pipe_fd[0], pipe_fd[1], + is_input ? "yes" : "no"); } -void io_bufferfill_t::print() const { fwprintf(stderr, L"bufferfill {%d}\n", write_fd_.fd()); } +void io_buffer_t::print() const { + fwprintf(stderr, L"buffer (input: %s, size %lu)\n", + is_input ? "yes" : "no", (unsigned long)buffer_.size()); +} void io_buffer_t::append_from_stream(const output_stream_t &stream) { - scoped_lock locker(append_lock_); if (buffer_.discarded()) return; if (stream.buffer().discarded()) { buffer_.set_discard(); @@ -38,143 +40,74 @@ void io_buffer_t::append_from_stream(const output_stream_t &stream) { buffer_.append_wide_buffer(stream.buffer()); } -void io_buffer_t::run_background_fillthread(autoclose_fd_t readfd) { - // Here we are running the background fillthread, executing in a background thread. - // Our plan is: - // 1. poll via select() until the fd is readable. - // 2. Acquire the append lock. - // 3. read until EAGAIN (would block), appending - // 4. release the lock - // The purpose of holding the lock around the read calls is to ensure that data from background - // processes isn't weirdly interspersed with data directly transferred (from a builtin to a buffer). +void io_buffer_t::read() { + exec_close(pipe_fd[1]); - const int fd = readfd.fd(); - - // 100 msec poll rate. Note that in most cases, the write end of the pipe will be closed so - // select() will return; the polling is important only for weird cases like a background process - // launched in a command substitution. - const long poll_timeout_usec = 100000; - struct timeval tv = {}; - tv.tv_usec = poll_timeout_usec; - - bool shutdown = false; - while (!shutdown) { - bool readable = false; - // Check the shutdown flag. - shutdown |= this->shutdown_fillthread_.load(std::memory_order_relaxed); - - // Poll if our fd is readable. - // Do this even if the shutdown flag is set. It's important we wait for the fd at least - // once. For short-lived processes, it's possible for the process to execute, produce output - // (fits in the pipe buffer) and be reaped before we are even scheduled. So always wait at - // least once on the fd. Note that doesn't mean we will wait for the full poll duration; - // typically what will happen is our pipe will be widowed and so this will return quickly. - // It's only for weird cases (e.g. a background process launched inside a command - // substitution) that we'll wait out the entire poll time. - fd_set fds; - FD_ZERO(&fds); - FD_SET(fd, &fds); - int ret = select(fd + 1, &fds, NULL, NULL, &tv); - readable = ret > 0; - if (ret < 0 && errno != EINTR) { - // Surprising error. - wperror(L"select"); - return; - } - - if (readable || shutdown) { - // Now either our fd is readable, or we have set the shutdown flag. - // Either way acquire the lock and read until we reach EOF, or EAGAIN / EINTR. - scoped_lock locker(append_lock_); - ssize_t ret; - do { - char buff[4096]; - ret = read(fd, buff, sizeof buff); - if (ret > 0) { - buffer_.append(&buff[0], &buff[ret]); - } else if (ret == 0) { - shutdown = true; - } else if (errno != EINTR && errno != EAGAIN) { - wperror(L"read"); - return; + if (io_mode == io_mode_t::buffer) { + debug(4, L"io_buffer_t::read: blocking read on fd %d", pipe_fd[0]); + while (1) { + char b[4096]; + long len = read_blocked(pipe_fd[0], b, 4096); + if (len == 0) { + break; + } else if (len < 0) { + // exec_read_io_buffer is only called on jobs that have exited, and will therefore + // never block. But a broken pipe seems to cause some flags to reset, causing the + // EOF flag to not be set. Therefore, EAGAIN is ignored and we exit anyway. + if (errno != EAGAIN) { + const wchar_t *fmt = + _(L"An error occured while reading output from code block on fd %d"); + debug(1, fmt, pipe_fd[0]); + wperror(L"io_buffer_t::read"); } - } while (ret > 0); + + break; + } else { + buffer_.append(&b[0], &b[len]); + } } } - assert(shutdown && "Should only exit loop if shutdown flag is set"); } -void io_buffer_t::begin_background_fillthread(autoclose_fd_t fd) { - ASSERT_IS_MAIN_THREAD(); - assert(!fillthread_ && "Already have a fillthread"); - - // We want our background thread to own the fd but it's not easy to move into a std::function. - // Use a shared_ptr. - auto fdref = std::make_shared(std::move(fd)); - - // Our function to read until the receiver is closed. - // It's OK to capture 'this' by value because 'this' owns the background thread and joins it - // before dtor. - std::function func = [this, fdref]() { - this->run_background_fillthread(std::move(*fdref)); - }; - - pthread_t fillthread{}; - if (!make_pthread(&fillthread, std::move(func))) { - wperror(L"make_pthread"); +bool io_buffer_t::avoid_conflicts_with_io_chain(const io_chain_t &ios) { + bool result = pipe_avoid_conflicts_with_io_chain(this->pipe_fd, ios); + if (!result) { + wperror(L"dup"); } - fillthread_ = fillthread; + return result; } -void io_buffer_t::complete_background_fillthread() { - ASSERT_IS_MAIN_THREAD(); - assert(fillthread_ && "Should have a fillthread"); - shutdown_fillthread_.store(true, std::memory_order_relaxed); - void *ignored = nullptr; - int err = pthread_join(*fillthread_, &ignored); - DIE_ON_FAILURE(err); - fillthread_.reset(); -} +shared_ptr io_buffer_t::create(int fd, const io_chain_t &conflicts, + size_t buffer_limit) { + bool success = true; + assert(fd >= 0); + shared_ptr buffer_redirect(new io_buffer_t(fd, buffer_limit)); -shared_ptr io_bufferfill_t::create(const io_chain_t &conflicts, - size_t buffer_limit) { - // Construct our pipes. - auto pipes = make_autoclose_pipes(conflicts); - if (!pipes) { - return nullptr; - } - // Our buffer will read from the read end of the pipe. This end must be non-blocking. This is - // because our fillthread needs to poll to decide if it should shut down, and also accept input - // from direct buffer transfers. - if (make_fd_nonblocking(pipes->read.fd())) { + if (exec_pipe(buffer_redirect->pipe_fd) == -1) { + debug(1, PIPE_ERROR); + wperror(L"pipe"); + success = false; + } else if (!buffer_redirect->avoid_conflicts_with_io_chain(conflicts)) { + // The above call closes the fds on error. + success = false; + } else if (make_fd_nonblocking(buffer_redirect->pipe_fd[0]) != 0) { debug(1, PIPE_ERROR); wperror(L"fcntl"); - return nullptr; + success = false; } - // Our fillthread gets the read end of the pipe; out_pipe gets the write end. - auto buffer = std::make_shared(buffer_limit); - buffer->begin_background_fillthread(std::move(pipes->read)); - return std::make_shared(std::move(pipes->write), buffer); + + if (!success) { + buffer_redirect.reset(); + } + return buffer_redirect; } -std::shared_ptr io_bufferfill_t::finish(std::shared_ptr &&filler) { - // The io filler is passed in. This typically holds the only instance of the write side of the - // pipe used by the buffer's fillthread (except for that side held by other processes). Get the - // buffer out of the bufferfill and clear the shared_ptr; this will typically widow the pipe. - // Then allow the buffer to finish. - assert(filler && "Null pointer in finish"); - auto buffer = filler->buffer(); - filler.reset(); - buffer->complete_background_fillthread(); - return buffer; -} - -io_pipe_t::~io_pipe_t() = default; - -io_bufferfill_t::~io_bufferfill_t() = default; - io_buffer_t::~io_buffer_t() { - assert(! fillthread_ && "io_buffer_t destroyed with outstanding fillthread"); + if (pipe_fd[0] >= 0) { + exec_close(pipe_fd[0]); + } + // Dont free fd for writing. This should already be free'd before calling exec_read_io_buffer on + // the buffer. } void io_chain_t::remove(const shared_ptr &element) { @@ -230,8 +163,11 @@ void io_print(const io_chain_t &chain) } #endif - -int move_fd_to_unused(int fd, const io_chain_t &io_chain, bool cloexec) { +/// If the given fd is used by the io chain, duplicates it repeatedly until an fd not used in the io +/// chain is found, or we run out. If we return a new fd or an error, closes the old one. Any fd +/// created is marked close-on-exec. Returns -1 on failure (in which case the given fd is still +/// closed). +static int move_fd_to_unused(int fd, const io_chain_t &io_chain) { if (fd < 0 || io_chain.get_io_for_fd(fd).get() == NULL) { return fd; } @@ -252,7 +188,7 @@ int move_fd_to_unused(int fd, const io_chain_t &io_chain, bool cloexec) { // Ok, we have a new candidate fd. Recurse. If we get a valid fd, either it's the same as // what we gave it, or it's a new fd and what we gave it has been closed. If we get a // negative value, the fd also has been closed. - if (cloexec) set_cloexec(tmp_fd); + set_cloexec(tmp_fd); new_fd = move_fd_to_unused(tmp_fd, io_chain); } @@ -264,7 +200,7 @@ int move_fd_to_unused(int fd, const io_chain_t &io_chain, bool cloexec) { return new_fd; } -static bool pipe_avoid_conflicts_with_io_chain(int fds[2], const io_chain_t &ios) { +bool pipe_avoid_conflicts_with_io_chain(int fds[2], const io_chain_t &ios) { bool success = true; for (int i = 0; i < 2; i++) { fds[i] = move_fd_to_unused(fds[i], ios); @@ -288,27 +224,6 @@ static bool pipe_avoid_conflicts_with_io_chain(int fds[2], const io_chain_t &ios return success; } -maybe_t make_autoclose_pipes(const io_chain_t &ios) { - int pipes[2] = {-1, -1}; - - if (pipe(pipes) < 0) { - debug(1, PIPE_ERROR); - wperror(L"pipe"); - return none(); - } - set_cloexec(pipes[0]); - set_cloexec(pipes[1]); - - if (!pipe_avoid_conflicts_with_io_chain(pipes, ios)) { - // The pipes are closed on failure here. - return none(); - } - autoclose_pipes_t result; - result.read = autoclose_fd_t(pipes[0]); - result.write = autoclose_fd_t(pipes[1]); - return maybe_t{std::move(result)}; -} - /// Return the last IO for the given fd. shared_ptr io_chain_t::get_io_for_fd(int fd) const { size_t idx = this->size(); diff --git a/src/io.h b/src/io.h index 5cffd0d21..161ec5941 100644 --- a/src/io.h +++ b/src/io.h @@ -1,21 +1,26 @@ #ifndef FISH_IO_H #define FISH_IO_H -#include #include #include #include -#include -#include -#include #include +// Note that we have to include something to get any _LIBCPP_VERSION defined so we can detect libc++ +// So it's key that vector go above. If we didn't need vector for other reasons, we might include +// ciso646, which does nothing +#if defined(_LIBCPP_VERSION) || __cplusplus > 199711L +// C++11 or libc++ (which is a C++11-only library, but the memory header works OK in C++03) +#include +using std::shared_ptr; +#else +// C++03 or libstdc++ +#include +using std::tr1::shared_ptr; +#endif #include "common.h" #include "env.h" -#include "maybe.h" - -using std::shared_ptr; /// separated_buffer_t is composed of a sequence of elements, some of which may be explicitly /// separated (e.g. through string spit0) and some of which the separation is inferred. This enum @@ -145,7 +150,7 @@ class separated_buffer_t { }; /// Describes what type of IO operation an io_data_t represents. -enum class io_mode_t { file, pipe, fd, close, bufferfill }; +enum class io_mode_t { file, pipe, fd, buffer, close }; /// Represents an FD redirection. class io_data_t { @@ -205,125 +210,64 @@ class io_file_t : public io_data_t { ~io_file_t() override { free((void *)filename_cstr); } }; -/// Represents (one end) of a pipe. class io_pipe_t : public io_data_t { - // The pipe's fd. Conceptually this is dup2'd to io_data_t::fd. - autoclose_fd_t pipe_fd_; - - /// Whether this is an input pipe. This is used only for informational purposes. - const bool is_input_; + protected: + io_pipe_t(io_mode_t m, int f, bool i) : io_data_t(m, f), is_input(i) { + pipe_fd[0] = pipe_fd[1] = -1; + } public: + int pipe_fd[2]; + const bool is_input; + void print() const override; - io_pipe_t(int fd, bool is_input, autoclose_fd_t pipe_fd) - : io_data_t(io_mode_t::pipe, fd), pipe_fd_(std::move(pipe_fd)), is_input_(is_input) {} - - ~io_pipe_t(); - - int pipe_fd() const { return pipe_fd_.fd(); } + io_pipe_t(int f, bool i) : io_data_t(io_mode_t::pipe, f), is_input(i) { + pipe_fd[0] = pipe_fd[1] = -1; + } }; -class io_buffer_t; class io_chain_t; - -/// Represents filling an io_buffer_t. Very similar to io_pipe_t. -/// Bufferfills always target stdout. -class io_bufferfill_t : public io_data_t { - /// Write end. The other end is connected to an io_buffer_t. - const autoclose_fd_t write_fd_; - - /// The receiving buffer. - const std::shared_ptr buffer_; - - public: - void print() const override; - - // The ctor is public to support make_shared() in the static create function below. - // Do not invoke this directly. - io_bufferfill_t(autoclose_fd_t write_fd, std::shared_ptr buffer) - : io_data_t(io_mode_t::bufferfill, STDOUT_FILENO), - write_fd_(std::move(write_fd)), - buffer_(std::move(buffer)) {} - - ~io_bufferfill_t(); - - std::shared_ptr buffer() const { return buffer_; } - - /// \return the fd that, when written to, fills the buffer. - int write_fd() const { return write_fd_.fd(); } - - /// Create an io_bufferfill_t which, when written from, fills a buffer with the contents. - /// \returns nullptr on failure, e.g. too many open fds. - /// - /// \param conflicts A set of IO redirections. The function ensures that any pipe it makes does - /// not conflict with an fd redirection in this list. - static shared_ptr create(const io_chain_t &conflicts, size_t buffer_limit = 0); - - /// Reset the receiver (possibly closing the write end of the pipe), and complete the fillthread - /// of the buffer. \return the buffer. - static std::shared_ptr finish(std::shared_ptr &&filler); -}; - class output_stream_t; - -/// An io_buffer_t is a buffer which can populate itself by reading from an fd. -/// It is not an io_data_t. -class io_buffer_t { +class io_buffer_t : public io_pipe_t { private: - friend io_bufferfill_t; - - /// Buffer storing what we have read. separated_buffer_t buffer_; - /// Atomic flag indicating our fillthread should shut down. - std::atomic shutdown_fillthread_; - - /// The background fillthread itself, if any. - maybe_t fillthread_{}; - - /// Read limit of the buffer. - const size_t read_limit_; - - /// Lock for appending. - std::mutex append_lock_{}; - - /// Called in the background thread to run it. - void run_background_fillthread(autoclose_fd_t readfd); - - /// Begin the background fillthread operation, reading from the given fd. - void begin_background_fillthread(autoclose_fd_t readfd); - - /// End the background fillthread operation. - void complete_background_fillthread(); - - public: - explicit io_buffer_t(size_t limit) : buffer_(limit), read_limit_(limit) { + explicit io_buffer_t(int f, size_t limit) + : io_pipe_t(io_mode_t::buffer, f, false /* not input */), buffer_(limit) { // Explicitly reset the discard flag because we share this buffer. buffer_.reset_discard(); } - ~io_buffer_t(); + public: + void print() const override; + + ~io_buffer_t() override; /// Access the underlying buffer. - /// This requires that the background fillthread be none. - const separated_buffer_t &buffer() const { - assert(!fillthread_ && "Cannot access buffer during background fill"); - return buffer_; - } + const separated_buffer_t &buffer() const { return buffer_; } /// Function to append to the buffer. - void append(const char *ptr, size_t count) { - scoped_lock locker(append_lock_); - buffer_.append(ptr, ptr + count); - } + void append(const char *ptr, size_t count) { buffer_.append(ptr, ptr + count); } - /// \return the read limit. - size_t read_limit() const { return read_limit_; } + /// Ensures that the pipes do not conflict with any fd redirections in the chain. + bool avoid_conflicts_with_io_chain(const io_chain_t &ios); + + /// Close output pipe, and read from input pipe until eof. + void read(); /// Appends data from a given output_stream_t. /// Marks the receiver as discarded if the stream was discarded. void append_from_stream(const output_stream_t &stream); + + /// Create a io_mode_t::buffer type io redirection, complete with a pipe and a vector for + /// output. The default file descriptor used is STDOUT_FILENO for buffering. + /// + /// \param fd the fd that will be mapped in the child process, typically STDOUT_FILENO + /// \param conflicts A set of IO redirections. The function ensures that any pipe it makes does + /// not conflict with an fd redirection in this list. + static shared_ptr create(int fd, const io_chain_t &conflicts, + size_t buffer_limit = 0); }; class io_chain_t : public std::vector> { @@ -343,24 +287,11 @@ class io_chain_t : public std::vector> { shared_ptr io_chain_get(const io_chain_t &src, int fd); shared_ptr io_chain_get(io_chain_t &src, int fd); -/// Helper type returned from making autoclose pipes. -struct autoclose_pipes_t { - /// Read end of the pipe. - autoclose_fd_t read; - - /// Write end of the pipe. - autoclose_fd_t write; -}; -/// Call pipe(), populating autoclose fds, avoiding conflicts. -/// The pipes are marked CLO_EXEC. -/// \return pipes on success, none() on error. -maybe_t make_autoclose_pipes(const io_chain_t &ios); - -/// If the given fd is used by the io chain, duplicates it repeatedly until an fd not used in the io -/// chain is found, or we run out. If we return a new fd or an error, closes the old one. -/// If \p cloexec is set, any fd created is marked close-on-exec. -/// \returns -1 on failure (in which case the given fd is still closed). -int move_fd_to_unused(int fd, const io_chain_t &io_chain, bool cloexec = true); +/// Given a pair of fds, if an fd is used by the given io chain, duplicate that fd repeatedly until +/// we find one that does not conflict, or we run out of fds. Returns the new fds by reference, +/// closing the old ones. If we get an error, returns false (in which case both fds are closed and +/// set to -1). +bool pipe_avoid_conflicts_with_io_chain(int fds[2], const io_chain_t &ios); /// Class representing the output that a builtin can generate. class output_stream_t { diff --git a/src/iothread.cpp b/src/iothread.cpp index c19f69339..d834da8c1 100644 --- a/src/iothread.cpp +++ b/src/iothread.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include #include #include @@ -150,14 +149,24 @@ static void *iothread_worker(void *unused) { /// Spawn another thread. No lock is held when this is called. static void iothread_spawn() { + // The spawned thread inherits our signal mask. We don't want the thread to ever receive signals + // on the spawned thread, so temporarily block all signals, spawn the thread, and then restore + // it. + sigset_t new_set, saved_set; + sigfillset(&new_set); + DIE_ON_FAILURE(pthread_sigmask(SIG_BLOCK, &new_set, &saved_set)); + // Spawn a thread. If this fails, it means there's already a bunch of threads; it is very // unlikely that they are all on the verge of exiting, so one is likely to be ready to handle // extant requests. So we can ignore failure with some confidence. pthread_t thread = 0; - if (make_pthread(&thread, iothread_worker, nullptr)) { - // We will never join this thread. - DIE_ON_FAILURE(pthread_detach(thread)); - } + pthread_create(&thread, NULL, iothread_worker, NULL); + + // We will never join this thread. + DIE_ON_FAILURE(pthread_detach(thread)); + debug(5, "pthread %p spawned", (void *)(intptr_t)thread); + // Restore our sigmask. + DIE_ON_FAILURE(pthread_sigmask(SIG_SETMASK, &saved_set, NULL)); } int iothread_perform_impl(void_function_t &&func, void_function_t &&completion) { @@ -333,48 +342,3 @@ void iothread_perform_on_main(void_function_t &&func) { // Ok, the request must now be done. assert(req.done); } - -bool make_pthread(pthread_t *result, void *(*func)(void *), void *param) { - // The spawned thread inherits our signal mask. We don't want the thread to ever receive signals - // on the spawned thread, so temporarily block all signals, spawn the thread, and then restore - // it. - sigset_t new_set, saved_set; - sigfillset(&new_set); - DIE_ON_FAILURE(pthread_sigmask(SIG_BLOCK, &new_set, &saved_set)); - - // Spawn a thread. If this fails, it means there's already a bunch of threads; it is very - // unlikely that they are all on the verge of exiting, so one is likely to be ready to handle - // extant requests. So we can ignore failure with some confidence. - pthread_t thread = 0; - int err = pthread_create(&thread, NULL, func, param); - if (err == 0) { - // Success, return the thread. - debug(5, "pthread %p spawned", (void *)(intptr_t)thread); - *result = thread; - } else { - perror("pthread_create"); - } - // Restore our sigmask. - DIE_ON_FAILURE(pthread_sigmask(SIG_SETMASK, &saved_set, NULL)); - return err == 0; -} - -using void_func_t = std::function; - -static void *func_invoker(void *param) { - void_func_t *vf = static_cast(param); - (*vf)(); - delete vf; - return nullptr; -} - -bool make_pthread(pthread_t *result, void_func_t &&func) { - // Copy the function into a heap allocation. - void_func_t *vf = new void_func_t(std::move(func)); - if (make_pthread(result, func_invoker, vf)) { - return true; - } - // Thread spawning failed, clean up our heap allocation. - delete vf; - return false; -} diff --git a/src/iothread.h b/src/iothread.h index 5e677b639..8b29370c8 100644 --- a/src/iothread.h +++ b/src/iothread.h @@ -69,16 +69,10 @@ int iothread_perform(const HANDLER &handler, const COMPLETION &completion) { // variant of iothread_perform without a completion handler inline int iothread_perform(std::function &&func) { - return iothread_perform_impl(std::move(func), {}); + return iothread_perform_impl(std::move(func), std::function()); } /// Performs a function on the main thread, blocking until it completes. void iothread_perform_on_main(std::function &&func); -/// Creates a pthread, manipulating the signal mask so that the thread receives no signals. -/// The pthread runs \p func. -/// \returns true on success, false on failure. -bool make_pthread(pthread_t *result, void *(*func)(void *), void *param); -bool make_pthread(pthread_t *result, std::function &&func); - #endif diff --git a/src/postfork.cpp b/src/postfork.cpp index d8ce8272e..7a6f655c0 100644 --- a/src/postfork.cpp +++ b/src/postfork.cpp @@ -19,7 +19,6 @@ #include "iothread.h" #include "postfork.h" #include "proc.h" -#include "redirection.h" #include "signal.h" #include "wutil.h" // IWYU pragma: keep @@ -39,6 +38,27 @@ /// Fork error message. #define FORK_ERROR "Could not create child process - exiting" +/// File redirection clobbering error message. +#define NOCLOB_ERROR "The file '%s' already exists" + +/// File redirection error message. +#define FILE_ERROR "An error occurred while redirecting file '%s'" + +/// File descriptor redirection error message. +#define FD_ERROR "An error occurred while redirecting file descriptor %s" + +/// Pipe error message. +#define LOCAL_PIPE_ERROR "An error occurred while setting up pipe" + +static bool log_redirections = false; + +/// Cover for debug_safe that can take an int. The format string should expect a %s. +static void debug_safe_int(int level, const char *format, int val) { + char buff[128]; + format_long_safe(buff, val); + debug_safe(level, format, buff); +} + /// Called only by the child to set its own process group (possibly creating a new group in the /// process if it is the first in a JOB_CONTROL job. /// Returns true on sucess, false on failiure. @@ -155,23 +175,126 @@ bool maybe_assign_terminal(const job_t *j) { return true; } -int setup_child_process(process_t *p, const dup2_list_t &dup2s) { - for (const auto &act : dup2s.get_actions()) { - int err = act.target < 0 ? close(act.src) : dup2(act.src, act.target); - if (err < 0) { - // We have a null p if this is for the exec (non-fork) path. - if (p != nullptr) { - debug_safe(4, "redirect_in_child_after_fork failed in setup_child_process"); - exit_without_destructors(1); +/// Set up a childs io redirections. Should only be called by setup_child_process(). Does the +/// following: First it closes any open file descriptors not related to the child by calling +/// close_unused_internal_pipes() and closing the universal variable server file descriptor. It then +/// goes on to perform all the redirections described by \c io. +/// +/// \param io_chain the list of IO redirections for the child +/// +/// \return 0 on sucess, -1 on failure +static int handle_child_io(const io_chain_t &io_chain) { + for (size_t idx = 0; idx < io_chain.size(); idx++) { + const io_data_t *io = io_chain.at(idx).get(); + + if (io->io_mode == io_mode_t::fd && io->fd == static_cast(io)->old_fd) { + continue; + } + + switch (io->io_mode) { + case io_mode_t::close: { + if (log_redirections) fwprintf(stderr, L"%d: close %d\n", getpid(), io->fd); + if (close(io->fd)) { + debug_safe_int(0, "Failed to close file descriptor %s", io->fd); + safe_perror("close"); + } + break; + } + + case io_mode_t::file: { + // Here we definitely do not want to set CLO_EXEC because our child needs access. + const io_file_t *io_file = static_cast(io); + int tmp = open(io_file->filename_cstr, io_file->flags, OPEN_MASK); + if (tmp < 0) { + if ((io_file->flags & O_EXCL) && (errno == EEXIST)) { + debug_safe(1, NOCLOB_ERROR, io_file->filename_cstr); + } else { + debug_safe(1, FILE_ERROR, io_file->filename_cstr); + safe_perror("open"); + } + + return -1; + } else if (tmp != io->fd) { + // This call will sometimes fail, but that is ok, this is just a precausion. + close(io->fd); + + if (dup2(tmp, io->fd) == -1) { + debug_safe_int(1, FD_ERROR, io->fd); + safe_perror("dup2"); + exec_close(tmp); + return -1; + } + exec_close(tmp); + } + break; + } + + case io_mode_t::fd: { + int old_fd = static_cast(io)->old_fd; + if (log_redirections) + fwprintf(stderr, L"%d: fd dup %d to %d\n", getpid(), old_fd, io->fd); + + // This call will sometimes fail, but that is ok, this is just a precausion. + close(io->fd); + + if (dup2(old_fd, io->fd) == -1) { + debug_safe_int(1, FD_ERROR, io->fd); + safe_perror("dup2"); + return -1; + } + break; + } + + case io_mode_t::buffer: + case io_mode_t::pipe: { + const io_pipe_t *io_pipe = static_cast(io); + // If write_pipe_idx is 0, it means we're connecting to the read end (first pipe + // fd). If it's 1, we're connecting to the write end (second pipe fd). + unsigned int write_pipe_idx = (io_pipe->is_input ? 0 : 1); +#if 0 + debug(0, L"%ls %ls on fd %d (%d %d)", write_pipe?L"write":L"read", + (io->io_mode == io_mode_t::buffer)?L"buffer":L"pipe", io->fd, io->pipe_fd[0], + io->pipe_fd[1]); +#endif + if (log_redirections) + fwprintf(stderr, L"%d: %s dup %d to %d\n", getpid(), + io->io_mode == io_mode_t::buffer ? "buffer" : "pipe", + io_pipe->pipe_fd[write_pipe_idx], io->fd); + if (dup2(io_pipe->pipe_fd[write_pipe_idx], io->fd) != io->fd) { + debug_safe(1, LOCAL_PIPE_ERROR); + safe_perror("dup2"); + return -1; + } + + if (io_pipe->pipe_fd[0] >= 0) exec_close(io_pipe->pipe_fd[0]); + if (io_pipe->pipe_fd[1] >= 0) exec_close(io_pipe->pipe_fd[1]); + break; } - return err; } } - // Set the handling for job control signals back to the default. - signal_reset_handlers(); + return 0; } +int setup_child_process(process_t *p, const io_chain_t &io_chain) { + bool ok = true; + + if (ok) { + // In the case of io_mode_t::file, this can hang until data is available to read/write! + ok = (0 == handle_child_io(io_chain)); + if (p != 0 && !ok) { + debug_safe(4, "handle_child_io failed in setup_child_process"); + exit_without_destructors(1); + } + } + + if (ok) { + // Set the handling for job control signals back to the default. + signal_reset_handlers(); + } + + return ok ? 0 : -1; +} int g_fork_count = 0; @@ -223,8 +346,9 @@ pid_t execute_fork(bool wait_for_threads_to_die) { #if FISH_USE_POSIX_SPAWN bool fork_actions_make_spawn_properties(posix_spawnattr_t *attr, - posix_spawn_file_actions_t *actions, const job_t *j, - const dup2_list_t &dup2s) { + posix_spawn_file_actions_t *actions, job_t *j, process_t *p, + const io_chain_t &io_chain) { + UNUSED(p); // Initialize the output. if (posix_spawnattr_init(attr) != 0) { return false; @@ -278,13 +402,52 @@ bool fork_actions_make_spawn_properties(posix_spawnattr_t *attr, sigemptyset(&sigmask); if (!err && reset_sigmask) err = posix_spawnattr_setsigmask(attr, &sigmask); - // Apply our dup2s. - for (const auto &act : dup2s.get_actions()) { - if (err) break; - if (act.target < 0) { - err = posix_spawn_file_actions_addclose(actions, act.src); - } else { - err = posix_spawn_file_actions_adddup2(actions, act.src, act.target); + for (size_t idx = 0; idx < io_chain.size(); idx++) { + const shared_ptr io = io_chain.at(idx); + + if (io->io_mode == io_mode_t::fd) { + const io_fd_t *io_fd = static_cast(io.get()); + if (io->fd == io_fd->old_fd) continue; + } + + switch (io->io_mode) { + case io_mode_t::close: { + if (!err) err = posix_spawn_file_actions_addclose(actions, io->fd); + break; + } + + case io_mode_t::file: { + const io_file_t *io_file = static_cast(io.get()); + if (!err) + err = posix_spawn_file_actions_addopen(actions, io->fd, io_file->filename_cstr, + io_file->flags /* mode */, OPEN_MASK); + break; + } + + case io_mode_t::fd: { + const io_fd_t *io_fd = static_cast(io.get()); + if (!err) + err = posix_spawn_file_actions_adddup2(actions, io_fd->old_fd /* from */, + io->fd /* to */); + break; + } + + case io_mode_t::buffer: + case io_mode_t::pipe: { + const io_pipe_t *io_pipe = static_cast(io.get()); + unsigned int write_pipe_idx = (io_pipe->is_input ? 0 : 1); + int from_fd = io_pipe->pipe_fd[write_pipe_idx]; + int to_fd = io->fd; + if (!err) err = posix_spawn_file_actions_adddup2(actions, from_fd, to_fd); + + if (write_pipe_idx > 0) { + if (!err) err = posix_spawn_file_actions_addclose(actions, io_pipe->pipe_fd[0]); + if (!err) err = posix_spawn_file_actions_addclose(actions, io_pipe->pipe_fd[1]); + } else { + if (!err) err = posix_spawn_file_actions_addclose(actions, io_pipe->pipe_fd[0]); + } + break; + } } } diff --git a/src/postfork.h b/src/postfork.h index d91d335cf..f7cfe080d 100644 --- a/src/postfork.h +++ b/src/postfork.h @@ -14,7 +14,7 @@ #define FISH_USE_POSIX_SPAWN HAVE_SPAWN_H #endif -class dup2_list_t; +class io_chain_t; class job_t; class process_t; @@ -29,11 +29,11 @@ bool maybe_assign_terminal(const job_t *j); /// descriptor actions are performed. /// /// \param p the child process to set up -/// \param dup2 the dup2 list to apply +/// \param io_chain the IO chain to use /// /// \return 0 on sucess, -1 on failiure. When this function returns, signals are always unblocked. /// On failiure, signal handlers, io redirections and process group of the process is undefined. -int setup_child_process(process_t *p, const dup2_list_t &dup2s); +int setup_child_process(process_t *p, const io_chain_t &io_chain); /// Call fork(), optionally waiting until we are no longer multithreaded. If the forked child /// doesn't do anything that could allocate memory, take a lock, etc. (like call exec), then it's @@ -55,8 +55,8 @@ void run_as_keepalive(pid_t parent_pid); /// Initializes and fills in a posix_spawnattr_t; on success, the caller should destroy it via /// posix_spawnattr_destroy. bool fork_actions_make_spawn_properties(posix_spawnattr_t *attr, - posix_spawn_file_actions_t *actions, const job_t *j, - const dup2_list_t &dup2s); + posix_spawn_file_actions_t *actions, job_t *j, process_t *p, + const io_chain_t &io_chain); #endif #endif diff --git a/src/proc.cpp b/src/proc.cpp index a440d1cda..cdacd05bb 100644 --- a/src/proc.cpp +++ b/src/proc.cpp @@ -836,6 +836,89 @@ void proc_update_jiffies() { #endif +/// The return value of select_try(), indicating IO readiness or an error +enum class select_try_t { + /// One or more fds have data ready for read + DATA_READY, + /// The timeout elapsed without any data becoming available for read + TIMEOUT, + /// There were no FDs in the io chain for which to select on. + IOCHAIN_EMPTY, +}; + +/// Check if there are buffers associated with the job, and select on them for a while if available. +/// +/// \param j the job to test +/// \return the status of the select operation +static select_try_t select_try(job_t *j) { + fd_set fds; + int maxfd = -1; + + FD_ZERO(&fds); + + const io_chain_t chain = j->all_io_redirections(); + for (const auto &io : chain) { + if (io->io_mode == io_mode_t::buffer) { + auto io_pipe = static_cast(io.get()); + int fd = io_pipe->pipe_fd[0]; + FD_SET(fd, &fds); + maxfd = std::max(maxfd, fd); + debug(4, L"select_try on fd %d", fd); + } + } + + if (maxfd >= 0) { + struct timeval timeout; + + timeout.tv_sec = 0; + timeout.tv_usec = 10000; + + int retval = select(maxfd + 1, &fds, 0, 0, &timeout); + if (retval == 0) { + debug(4, L"select_try hit timeout"); + return select_try_t::TIMEOUT; + } + return select_try_t::DATA_READY; + } + + return select_try_t::IOCHAIN_EMPTY; +} + +/// Read from descriptors until they are empty. +/// +/// \param j the job to test +static void read_try(job_t *j) { + io_buffer_t *buff = NULL; + + // Find the last buffer, which is the one we want to read from. + const io_chain_t chain = j->all_io_redirections(); + for (size_t idx = 0; idx < chain.size(); idx++) { + io_data_t *d = chain.at(idx).get(); + if (d->io_mode == io_mode_t::buffer) { + buff = static_cast(d); + } + } + + if (buff) { + debug(4, L"proc::read_try('%ls')", j->command_wcstr()); + while (1) { + char b[BUFFER_SIZE]; + long len = read_blocked(buff->pipe_fd[0], b, BUFFER_SIZE); + if (len == 0) { + break; + } else if (len < 0) { + if (errno != EAGAIN) { + debug(1, _(L"An error occured while reading output from code block")); + wperror(L"read_try"); + } + break; + } else { + buff->append(b, len); + } + } + } +} + // Return control of the terminal to a job's process group. restore_attrs is true if we are restoring // a previously-stopped job, in which case we need to restore terminal attributes. bool terminal_give_to_job(const job_t *j, bool restore_attrs) { @@ -1017,6 +1100,7 @@ void job_t::continue_job(bool send_sigcont) { } }); + bool read_attempted = false; if (!is_completed()) { if (get_flag(job_flag_t::TERMINAL) && is_foreground()) { // Put the job into the foreground and give it control of the terminal. @@ -1049,15 +1133,71 @@ void job_t::continue_job(bool send_sigcont) { } if (is_foreground()) { - // Wait for the status of our own job to change. + // This is an optimization to not call select_try() in case a process has exited. While + // it may seem silly, unless there is IO (and there usually isn't in terms of total CPU + // time), select_try() will wait for 10ms (our timeout) before returning. If during + // these 10ms a process exited, the shell will basically hang until the timeout happens + // and we are free to call `process_mark_finished_children()` to discover that fact. By + // calling it here before calling `select_try()` below, shell responsiveness can be + // dramatically improved (noticably so, not just "theoretically speaking" per the + // discussion in #5219). + process_mark_finished_children(false); + + // If this is a child job and the parent job is still under construction (i.e. job1 | + // some_func), we can't block on execution of the nested job for `some_func`. Doing + // so can cause hangs if job1 emits more data than fits in the OS pipe buffer. + // The solution is to to not block on fg from the initial call in exec_job(), which + // is also the only place that send_sigcont is false. parent_job.is_constructed() + // must also be true, which coincides with WAIT_BY_PROCESS (which will have to do + // since we don't store a reference to the parent job in the job_t structure). + bool block_on_fg = send_sigcont && job_chain_is_fully_constructed(); + + // Wait for data to become available or the status of our own job to change while (!reader_exit_forced() && !is_stopped() && !is_completed()) { - process_mark_finished_children(true); + auto result = select_try(this); + read_attempted = true; + + switch (result) { + case select_try_t::DATA_READY: + // Read the data that we know is now available, then scan for finished processes + // but do not block. We don't block so long as we have IO to process, once the + // fd buffers are empty we'll block in the second case below. + read_try(this); + process_mark_finished_children(false); + break; + + case select_try_t::TIMEOUT: + // No FDs are ready. Look for finished processes instead. + debug(4, L"select_try: no fds returned valid data within the timeout" ); + process_mark_finished_children(block_on_fg); + break; + + case select_try_t::IOCHAIN_EMPTY: + // There were no IO fds to select on. + debug(4, L"select_try: no IO fds" ); + process_mark_finished_children(true); + + // If it turns out that we encountered this because the file descriptor we were + // reading from has died, process_mark_finished_children() should take care of + // changing the status of our is_completed() (assuming it is appropriate to do + // so), in which case we will break out of this loop. + break; + } } } } if (is_foreground()) { if (is_completed()) { + // It's possible that the job will produce output and exit before we've even read from + // it. In that case, make sure we read that output now, before we've executed any + // subsequent calls. This is why prompt colors were getting screwed up - the builtin + // `echo` calls were sometimes having their output combined with the `set_color` calls + // in the wrong order! + if (!read_attempted) { + read_try(this); + } + // Set $status only if we are in the foreground and the last process in the job has // finished and is not a short-circuited builtin. auto &p = processes.back(); diff --git a/src/redirection.cpp b/src/redirection.cpp deleted file mode 100644 index 6c00c4d55..000000000 --- a/src/redirection.cpp +++ /dev/null @@ -1,92 +0,0 @@ -#include "config.h" // IWYU pragma: keep - -#include "redirection.h" -#include "wutil.h" - -#include - -/// File descriptor redirection error message. -#define FD_ERROR "An error occurred while redirecting file descriptor %s" - -/// Pipe error message. -#define LOCAL_PIPE_ERROR "An error occurred while setting up pipe" - -#define NOCLOB_ERROR _(L"The file '%s' already exists") - -#define FILE_ERROR _(L"An error occurred while redirecting file '%s'") - -/// Base open mode to pass to calls to open. -#define OPEN_MASK 0666 - -dup2_list_t::~dup2_list_t() = default; - -maybe_t dup2_list_t::resolve_chain(const io_chain_t &io_chain) { - ASSERT_IS_NOT_FORKED_CHILD(); - dup2_list_t result; - for (const auto &io_ref : io_chain) { - switch (io_ref->io_mode) { - case io_mode_t::file: { - // Here we definitely do not want to set CLO_EXEC because our child needs access. - // Open the file. - const io_file_t *io_file = static_cast(io_ref.get()); - int file_fd = open(io_file->filename_cstr, io_file->flags, OPEN_MASK); - if (file_fd < 0) { - if ((io_file->flags & O_EXCL) && (errno == EEXIST)) { - debug(1, NOCLOB_ERROR, io_file->filename_cstr); - } else { - debug(1, FILE_ERROR, io_file->filename_cstr); - if (should_debug(1)) wperror(L"open"); - } - return none(); - } - - // If by chance we got the file we want, we're done. Otherwise move the fd to an unused place and dup2 it. - // Note move_fd_to_unused() will close the incoming file_fd. - if (file_fd != io_file->fd) { - file_fd = move_fd_to_unused(file_fd, io_chain, false /* cloexec */); - if (file_fd < 0) { - debug(1, FILE_ERROR, io_file->filename_cstr); - if (should_debug(1)) wperror(L"dup"); - return none(); - } - } - - // Record that we opened this file, so we will auto-close it. - assert(file_fd >= 0 && "Should have a valid file_fd"); - result.opened_fds_.emplace_back(file_fd); - - // Mark our dup2 and our close actions. - result.add_dup2(file_fd, io_file->fd); - result.add_close(file_fd); - break; - } - - case io_mode_t::close: { - const io_close_t *io = static_cast(io_ref.get()); - result.add_close(io->fd); - break; - } - - case io_mode_t::fd: { - const io_fd_t *io = static_cast(io_ref.get()); - result.add_dup2(io->old_fd, io->fd); - break; - } - - case io_mode_t::pipe: { - const io_pipe_t *io = static_cast(io_ref.get()); - result.add_dup2(io->pipe_fd(), io->fd); - result.add_close(io->pipe_fd()); - break; - } - - case io_mode_t::bufferfill: { - const io_bufferfill_t *io = static_cast(io_ref.get()); - result.add_dup2(io->write_fd(), io->fd); - result.add_close(io->write_fd()); - break; - } - } - } - return result; -} diff --git a/src/redirection.h b/src/redirection.h deleted file mode 100644 index 95cc8fac0..000000000 --- a/src/redirection.h +++ /dev/null @@ -1,64 +0,0 @@ -#ifndef FISH_REDIRECTION_H -#define FISH_REDIRECTION_H - -#include "common.h" -#include "maybe.h" -#include "io.h" - -#include - -/// This file supports "applying" redirections. - -/// A class representing a sequence of basic redirections. -class dup2_list_t { - public: - /// A type that represents the action dup2(src, target). - /// If target is negative, this represents close(src). - /// Note none of the fds here are considered 'owned'. - struct action_t { - int src; - int target; - }; - - private: - /// The list of actions. - std::vector actions_; - - /// The list of fds that we opened, and are responsible for closing. - std::vector opened_fds_; - - /// Append a dup2 action. - void add_dup2(int src, int target) { - assert(src >= 0 && target >= 0 && "Invalid fd in add_dup2"); - if (src != target) { - actions_.push_back(action_t{src, target}); - } - } - - /// Append a close action. - void add_close(int fd) { - assert(fd >= 0 && "Invalid fd in add_close"); - actions_.push_back(action_t{fd, -1}); - } - - dup2_list_t() = default; - -public: - ~dup2_list_t(); - - /// Disable copying because we own our fds. - dup2_list_t(const dup2_list_t &) = delete; - void operator=(const dup2_list_t &) = delete; - - dup2_list_t(dup2_list_t &&) = default; - dup2_list_t &operator=(dup2_list_t &&) = default; - - /// \return the list of dup2 actions. - const std::vector &get_actions() const { return actions_; } - - /// Produce a dup_fd_list_t from an io_chain. This may not be called before fork(). - /// The result contains the list of fd actions (dup2 and close), as well as the list of fds opened. - static maybe_t resolve_chain(const io_chain_t &); -}; - -#endif