From 206b2d0a26a2fd37dc4a1b7313151f7e4b93b84a Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Thu, 6 Aug 2020 19:00:53 -0700 Subject: [PATCH] Simplify topic monitoring The topic monitor allows a client to wait for multiple events, e.g. sigchld or an internal process exit. Prior to this change a client had to specify the list of generations and the list of topics they are interested in. Simplify this to just the list of generations, with a max-value generation meaning the topic is not interesting. Also remove the use of enum_set and enum_array, it was too complex for what it offered. --- src/fish_tests.cpp | 29 ++++----- src/proc.cpp | 34 ++++------- src/signal.cpp | 9 ++- src/signal.h | 2 +- src/topic_monitor.cpp | 68 +++++++++++---------- src/topic_monitor.h | 136 ++++++++++++++++++++++++++++++------------ 6 files changed, 170 insertions(+), 108 deletions(-) diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp index d46513b93..c095dcb12 100644 --- a/src/fish_tests.cpp +++ b/src/fish_tests.cpp @@ -5782,22 +5782,23 @@ static void test_topic_monitor() { topic_monitor_t monitor; generation_list_t gens{}; constexpr auto t = topic_t::sigchld; - do_test(gens[t] == 0); + gens.sigchld = 0; do_test(monitor.generation_for_topic(t) == 0); - auto changed = monitor.check(&gens, topic_set_t{t}, false /* wait */); - do_test(changed.none()); - do_test(gens[t] == 0); + auto changed = monitor.check(&gens, false /* wait */); + do_test(!changed); + do_test(gens.sigchld == 0); monitor.post(t); - changed = monitor.check(&gens, topic_set_t{t}, true /* wait */); - do_test(changed == topic_set_t{t}); - do_test(gens[t] == 1); + changed = monitor.check(&gens, true /* wait */); + do_test(changed); + do_test(gens.at(t) == 1); do_test(monitor.generation_for_topic(t) == 1); monitor.post(t); do_test(monitor.generation_for_topic(t) == 2); - changed = monitor.check(&gens, topic_set_t{t}, true /* wait */); - do_test(changed == topic_set_t{t}); + changed = monitor.check(&gens, true /* wait */); + do_test(changed); + do_test(gens.sigchld == 2); } static void test_topic_monitor_torture() { @@ -5807,7 +5808,7 @@ static void test_topic_monitor_torture() { constexpr auto t1 = topic_t::sigchld; constexpr auto t2 = topic_t::sighupint; std::vector gens; - gens.resize(thread_count, generation_list_t{}); + gens.resize(thread_count, generation_list_t::invalids()); std::atomic post_count{}; for (auto &gen : gens) { gen = monitor.current_generations(); @@ -5822,11 +5823,11 @@ static void test_topic_monitor_torture() { [&](size_t i) { for (size_t j = 0; j < (1 << 11); j++) { auto before = gens[i]; - auto changed = monitor.check(&gens[i], topic_set_t{t1, t2}, true /* wait */); + auto changed = monitor.check(&gens[i], true /* wait */); (void)changed; - do_test(before[t1] < gens[i][t1]); - do_test(gens[i][t1] <= post_count); - do_test(gens[i][t2] == 0); + do_test(before.at(t1) < gens[i].at(t1)); + do_test(gens[i].at(t1) <= post_count); + do_test(gens[i].at(t2) == 0); } auto amt = completed.fetch_add(1, std::memory_order_relaxed); (void)amt; diff --git a/src/proc.cpp b/src/proc.cpp index 06e46b511..05c9e5ed5 100644 --- a/src/proc.cpp +++ b/src/proc.cpp @@ -388,43 +388,35 @@ static void process_mark_finished_children(parser_t &parser, bool block_ok) { // The exit generation tells us if we have an exit; the signal generation allows for detecting // SIGHUP and SIGINT. // Get the gen count of all reapable processes. - topic_set_t reaptopics{}; - generation_list_t gens{}; - gens.fill(invalid_generation); + generation_list_t reapgens = generation_list_t::invalids(); for (const auto &j : parser.jobs()) { for (const auto &proc : j->processes) { if (auto mtopic = j->reap_topic_for_process(proc.get())) { - topic_t topic = *mtopic; - reaptopics.set(topic); - gens[topic] = std::min(gens[topic], proc->gens_[topic]); - - reaptopics.set(topic_t::sighupint); - gens[topic_t::sighupint] = - std::min(gens[topic_t::sighupint], proc->gens_[topic_t::sighupint]); + reapgens.set_min_from(*mtopic, proc->gens_); + reapgens.set_min_from(topic_t::sighupint, proc->gens_); } } } - if (reaptopics.none()) { - // No reapable processes, nothing to wait for. + // Now check for changes, optionally waiting. + if (!topic_monitor_t::principal().check(&reapgens, block_ok)) { + // Nothing changed. return; } - // Now check for changes, optionally waiting. - auto changed_topics = topic_monitor_t::principal().check(&gens, reaptopics, block_ok); - if (changed_topics.none()) return; - // We got some changes. Since we last checked we received SIGCHLD, and or HUP/INT. // Update the hup/int generations and reap any reapable processes. for (auto &j : parser.jobs()) { for (const auto &proc : j->processes) { if (auto mtopic = j->reap_topic_for_process(proc.get())) { - // Update the signal hup/int gen. - proc->gens_[topic_t::sighupint] = gens[topic_t::sighupint]; + topic_t topic = *mtopic; - if (proc->gens_[*mtopic] < gens[*mtopic]) { - // Potentially reapable. Update its gen count and try reaping it. - proc->gens_[*mtopic] = gens[*mtopic]; + // Update the signal hup/int gen. + proc->gens_.sighupint = reapgens.sighupint; + + if (proc->gens_.at(topic) < reapgens.at(topic)) { + // Potentially reapable. Update its generation and try reaping it. + proc->gens_.at(topic) = reapgens.at(topic); if (proc->internal_proc_) { // Try reaping an internal process. if (proc->internal_proc_->exited()) { diff --git a/src/signal.cpp b/src/signal.cpp index 65424ec58..068f77d56 100644 --- a/src/signal.cpp +++ b/src/signal.cpp @@ -404,8 +404,7 @@ void signal_unblock_all() { sigprocmask(SIG_SETMASK, &iset, nullptr); } -sigchecker_t::sigchecker_t(topic_t signal) { - topic_ = signal; +sigchecker_t::sigchecker_t(topic_t signal) : topic_(signal) { // Call check() to update our generation. check(); } @@ -420,7 +419,7 @@ bool sigchecker_t::check() { void sigchecker_t::wait() const { auto &tm = topic_monitor_t::principal(); - generation_list_t gens{}; - gens[topic_] = this->gen_; - tm.check(&gens, {topic_}, true /* wait */); + generation_list_t gens = generation_list_t::invalids(); + gens.at(topic_) = this->gen_; + tm.check(&gens, true /* wait */); } diff --git a/src/signal.h b/src/signal.h index abc3a4a57..99c318646 100644 --- a/src/signal.h +++ b/src/signal.h @@ -45,7 +45,7 @@ void signal_clear_cancel(); enum class topic_t : uint8_t; /// A sigint_detector_t can be used to check if a SIGINT (or SIGHUP) has been delivered. class sigchecker_t { - topic_t topic_; + const topic_t topic_; uint64_t gen_{0}; public: diff --git a/src/topic_monitor.cpp b/src/topic_monitor.cpp index fb0e9b55f..ee62a046e 100644 --- a/src/topic_monitor.cpp +++ b/src/topic_monitor.cpp @@ -7,13 +7,14 @@ #include "flog.h" #include "iothread.h" +#include "wcstringutil.h" #include "wutil.h" // Whoof. Thread Sanitizer swallows signals and replays them at its leisure, at the point where // instrumented code makes certain blocking calls. But tsan cannot interrupt a signal call, so // if we're blocked in read() (like the topic monitor wants to be!), we'll never receive SIGCHLD // and so deadlock. So if tsan is enabled, we mark our fd as non-blocking (so reads will never -// block) and use use select() to poll it. +// block) and use select() to poll it. #if defined(__has_feature) #if __has_feature(thread_sanitizer) #define TOPIC_MONITOR_TSAN_WORKAROUND @@ -23,16 +24,23 @@ #define TOPIC_MONITOR_TSAN_WORKAROUND #endif +wcstring generation_list_t::describe() const { + wcstring result; + for (generation_t gen : this->as_array()) { + if (!result.empty()) result.push_back(L','); + if (gen == invalid_generation) { + result.append(L"-1"); + } else { + result.append(to_string(gen)); + } + } + return result; +} + /// Implementation of the principal monitor. This uses new (and leaks) to avoid registering a /// pointless at-exit handler for the dtor. static topic_monitor_t *const s_principal = new topic_monitor_t(); -/// \return the metagen for a topic generation list. -/// The metagen is simply the sum of topic generations. Note it is monotone. -static generation_t metagen_for(const generation_list_t &lst) { - return std::accumulate(lst.begin(), lst.end(), generation_t{0}); -} - topic_monitor_t &topic_monitor_t::principal() { // Do not attempt to move s_principal to a function-level static, it needs to be accessed from a // signal handler so it must not be lazily created. @@ -59,7 +67,7 @@ topic_monitor_t::~topic_monitor_t() = default; void topic_monitor_t::post(topic_t topic) { // Beware, we may be in a signal handler! // Atomically update the pending topics. - auto rawtopics = topic_set_t{topic}.to_raw(); + auto rawtopics = topic_to_bit(topic); auto oldtopics = pending_updates_.fetch_or(rawtopics, std::memory_order_relaxed); if ((oldtopics & rawtopics) == rawtopics) { // No new bits were set. @@ -83,26 +91,26 @@ generation_list_t topic_monitor_t::updated_gens_in_data(acquired_lock &d // If there are no pending updates (likely), just return. // Otherwise CAS in 0 and update our topics. const auto relaxed = std::memory_order_relaxed; - topic_set_raw_t raw; + topic_bitmask_t changed_topic_bits; bool cas_success; do { - raw = pending_updates_.load(relaxed); - if (raw == 0) return data->current_gens; - cas_success = pending_updates_.compare_exchange_weak(raw, 0, relaxed, relaxed); + changed_topic_bits = pending_updates_.load(relaxed); + if (changed_topic_bits == 0) return data->current; + cas_success = + pending_updates_.compare_exchange_weak(changed_topic_bits, 0, relaxed, relaxed); } while (!cas_success); // Update the current generation with our topics and return it. - auto topics = topic_set_t::from_raw(raw); - for (topic_t topic : topic_iter_t{}) { - if (topics.get(topic)) { - data->current_gens.at(topic) += 1; + for (topic_t topic : all_topics()) { + if (changed_topic_bits & topic_to_bit(topic)) { + data->current.at(topic) += 1; FLOG(topic_monitor, "Updating topic", static_cast(topic), "to", - data->current_gens.at(topic)); + data->current.at(topic)); } } // Report our change. data_notifier_.notify_all(); - return data->current_gens; + return data->current; } generation_list_t topic_monitor_t::updated_gens() { @@ -116,8 +124,8 @@ bool topic_monitor_t::try_update_gens_maybe_becoming_reader(generation_list_t *g for (;;) { // See if the updated gen list has changed. If so we don't need to become the reader. auto current = updated_gens_in_data(data); - FLOG(topic_monitor, "TID", thread_id(), "local mgen", metagen_for(*gens), ": current", - metagen_for(current)); + FLOG(topic_monitor, "TID", thread_id(), "local ", gens->describe(), ": current", + current.describe()); if (*gens != current) { *gens = current; break; @@ -162,9 +170,9 @@ generation_list_t topic_monitor_t::await_gens(const generation_list_t &input_gen // We are finished reading. We must stop being the reader, and post on the condition // variable to wake up any other threads waiting for us to finish reading. auto data = data_.acquire(); - gens = data->current_gens; - FLOG(topic_monitor, "TID", thread_id(), "local mgen", metagen_for(input_gens), - "read() complete, current mgen is", metagen_for(gens)); + gens = data->current; + FLOG(topic_monitor, "TID", thread_id(), "local", input_gens.describe(), + "read() complete, current is", gens.describe()); assert(data->has_reader && "We should be the reader"); data->has_reader = false; data_notifier_.notify_all(); @@ -173,26 +181,26 @@ generation_list_t topic_monitor_t::await_gens(const generation_list_t &input_gen return gens; } -topic_set_t topic_monitor_t::check(generation_list_t *gens, topic_set_t topics, bool wait) { - if (topics.none()) return topics; +bool topic_monitor_t::check(generation_list_t *gens, bool wait) { + if (!gens->any_valid()) return false; generation_list_t current = updated_gens(); - topic_set_t changed{}; + bool changed = false; for (;;) { // Load the topic list and see if anything has changed. - for (topic_t topic : topic_iter_t{}) { - if (topics.get(topic)) { + for (topic_t topic : all_topics()) { + if (gens->is_valid(topic)) { assert(gens->at(topic) <= current.at(topic) && "Incoming gen count exceeded published count"); if (gens->at(topic) < current.at(topic)) { gens->at(topic) = current.at(topic); - changed.set(topic); + changed = true; } } } // If we're not waiting, or something changed, then we're done. - if (!wait || changed.any()) { + if (!wait || changed) { break; } diff --git a/src/topic_monitor.h b/src/topic_monitor.h index 36b2e2594..10511a557 100644 --- a/src/topic_monitor.h +++ b/src/topic_monitor.h @@ -9,7 +9,6 @@ #include #include "common.h" -#include "enum_set.h" #include "io.h" /** Topic monitoring support. Topics are conceptually "a thing that can happen." For example, @@ -32,50 +31,109 @@ set. This is the real power of topics: you can wait for a sigchld signal OR a thread exit. */ -/// The list of topics that may be observed. -enum class topic_t : uint8_t { - sigchld, // Corresponds to SIGCHLD signal. - sighupint, // Corresponds to both SIGHUP and SIGINT signals. - internal_exit, // Corresponds to an internal process exit. - COUNT -}; - -/// Allow enum_iter to be used. -template <> -struct enum_info_t { - static constexpr auto count = topic_t::COUNT; -}; - -/// Set of topics. -using topic_set_t = enum_set_t; - -/// Counting iterator for topics. -using topic_iter_t = enum_iter_t; - /// A generation is a counter incremented every time the value of a topic changes. /// It is 64 bit so it will never wrap. using generation_t = uint64_t; -/// A generation value which is guaranteed to never be set and be larger than any valid generation. +/// A generation value which indicates the topic is not of interest. constexpr generation_t invalid_generation = std::numeric_limits::max(); -/// List of generation values, indexed by topics. -/// The initial value of a generation is always 0. -using generation_list_t = enum_array_t; +/// The list of topics which may be observed. +enum class topic_t : uint8_t { + sighupint, // Corresponds to both SIGHUP and SIGINT signals. + sigchld, // Corresponds to SIGCHLD signal. + internal_exit, // Corresponds to an internal process exit. +}; + +/// Helper to return all topics, allowing easy iteration. +inline std::array all_topics() { + return {{topic_t::sighupint, topic_t::sigchld, topic_t::internal_exit}}; +} + +/// Simple value type containing the values for a topic. +/// This should be kept in sync with topic_t. +class generation_list_t { + public: + generation_list_t() = default; + + generation_t sighupint{0}; + generation_t sigchld{0}; + generation_t internal_exit{0}; + + /// \return the value for a topic. + generation_t &at(topic_t topic) { + switch (topic) { + case topic_t::sigchld: + return sigchld; + case topic_t::sighupint: + return sighupint; + case topic_t::internal_exit: + return internal_exit; + } + DIE("Unreachable"); + } + + generation_t at(topic_t topic) const { + switch (topic) { + case topic_t::sighupint: + return sighupint; + case topic_t::sigchld: + return sigchld; + case topic_t::internal_exit: + return internal_exit; + } + DIE("Unreachable"); + } + + /// \return ourselves as an array. + std::array as_array() const { return {{sighupint, sigchld, internal_exit}}; } + + /// Set the value of \p topic to the smaller of our value and the value in \p other. + void set_min_from(topic_t topic, const generation_list_t &other) { + if (this->at(topic) > other.at(topic)) { + this->at(topic) = other.at(topic); + } + } + + /// \return whether a topic is valid. + bool is_valid(topic_t topic) const { return this->at(topic) != invalid_generation; } + + /// \return whether any topic is valid. + bool any_valid() const { + bool valid = false; + for (auto gen : as_array()) { + if (gen != invalid_generation) valid = true; + } + return valid; + } + + bool operator==(const generation_list_t &rhs) const { return as_array() == rhs.as_array(); } + + bool operator!=(const generation_list_t &rhs) const { return !(*this == rhs); } + + /// return a string representation for debugging. + wcstring describe() const; + + /// Generation list containing invalid generations only. + static generation_list_t invalids() { + return generation_list_t(invalid_generation, invalid_generation, invalid_generation); + } + + private: + generation_list_t(generation_t sighupint, generation_t sigchld, generation_t internal_exit) + : sighupint(sighupint), sigchld(sigchld), internal_exit(internal_exit) {} +}; /// The topic monitor class. This permits querying the current generation values for topics, /// optionally blocking until they increase. class topic_monitor_t { private: - using topic_set_raw_t = uint8_t; - - static_assert(sizeof(topic_set_raw_t) * CHAR_BIT >= enum_count(), - "topic_set_raw is too small"); + using topic_bitmask_t = uint8_t; // Some stuff that needs to be protected by the same lock. struct data_t { - /// The current generation list. - generation_list_t current_gens{}; + /// The current values. + generation_list_t current{}; /// Whether there is a thread currently reading from the notifier pipe. bool has_reader{false}; @@ -88,7 +146,7 @@ class topic_monitor_t { /// The set of topics which have pending increments. /// This is managed via atomics. - std::atomic pending_updates_{}; + std::atomic pending_updates_{}; /// Self-pipes used to communicate changes. /// The writer is a signal handler. @@ -118,6 +176,9 @@ class topic_monitor_t { /// \return the current generation list, opportunistically applying any pending updates. generation_list_t updated_gens(); + /// Helper to convert a topic to a bitmask containing just that topic. + static topic_bitmask_t topic_to_bit(topic_t t) { return 1 << static_cast(t); } + public: topic_monitor_t(); ~topic_monitor_t(); @@ -140,11 +201,12 @@ class topic_monitor_t { /// Access the generation for a topic. generation_t generation_for_topic(topic_t topic) { return current_generations().at(topic); } - /// See if for any topic (specified in \p topics) has changed from the values in the generation - /// list \p gens. If \p wait is set, then wait if there are no changes; otherwise return - /// immediately. - /// \return the set of topics that changed, updating the generation list \p gens. - topic_set_t check(generation_list_t *gens, topic_set_t topics, bool wait); + /// For each valid topic in \p gens, check to see if the current topic is larger than + /// the value in \p gens. + /// If \p wait is set, then wait if there are no changes; otherwise return immediately. + /// \return true if some topic changed, false if none did. + /// On a true return, this updates the generation list \p gens. + bool check(generation_list_t *gens, bool wait); }; #endif