diff --git a/src/common.h b/src/common.h index ae9950ac6..b32487d21 100644 --- a/src/common.h +++ b/src/common.h @@ -635,6 +635,9 @@ class acquired_lock { /// Create from a global lock. /// This is used in weird cases where a global lock protects more than one piece of data. static acquired_lock from_global(std::mutex &lk, Data *v) { return acquired_lock{lk, v}; } + + /// \return a reference to the lock, for use with a condition variable. + std::unique_lock &get_lock() { return lock; } }; // A lock that owns a piece of data diff --git a/src/topic_monitor.cpp b/src/topic_monitor.cpp index 70d05b6a7..f1fe05a6c 100644 --- a/src/topic_monitor.cpp +++ b/src/topic_monitor.cpp @@ -26,6 +26,12 @@ /// pointless at-exit handler for the dtor. static topic_monitor_t *const s_principal = new topic_monitor_t(); +/// \return the metagen for a topic generation list. +/// The metagen is simply the sum of topic generations. Note it is monotone. +static generation_t metagen_for(const generation_list_t &lst) { + return std::accumulate(lst.begin(), lst.end(), generation_t{0}); +} + topic_monitor_t &topic_monitor_t::principal() { // Do not attempt to move s_principal to a function-level static, it needs to be accessed from a // signal handler so it must not be lazily created. @@ -71,9 +77,7 @@ void topic_monitor_t::post(topic_t topic) { // Ignore EAGAIN and other errors (which conceivably could occur during shutdown). } -generation_list_t topic_monitor_t::updated_gens() { - auto current_gens = current_gen_.acquire(); - +generation_list_t topic_monitor_t::updated_gens_in_data(acquired_lock &data) { // Atomically acquire the pending updates, swapping in 0. // If there are no pending updates (likely), just return. // Otherwise CAS in 0 and update our topics. @@ -82,7 +86,7 @@ generation_list_t topic_monitor_t::updated_gens() { bool cas_success; do { raw = pending_updates_.load(relaxed); - if (raw == 0) return *current_gens; + if (raw == 0) return data->current_gens; cas_success = pending_updates_.compare_exchange_weak(raw, 0, relaxed, relaxed); } while (!cas_success); @@ -90,76 +94,88 @@ generation_list_t topic_monitor_t::updated_gens() { auto topics = topic_set_t::from_raw(raw); for (topic_t topic : topic_iter_t{}) { if (topics.get(topic)) { - current_gens->at(topic) += 1; - FLOG(topic_monitor, "Updating topic", (int)topic, "to", current_gens->at(topic)); + data->current_gens.at(topic) += 1; + FLOG(topic_monitor, "Updating topic", (int)topic, "to", data->current_gens.at(topic)); } } - return *current_gens; + // Report our change. + data_notifier_.notify_all(); + return data->current_gens; } -void topic_monitor_t::await_metagen(generation_t mgen) { - // Fast check of the metagen before taking the lock. If it's changed we're done. - generation_t current = current_metagen(); - FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, ": current", current); - if (mgen != current) return; +generation_list_t topic_monitor_t::updated_gens() { + auto data = data_.acquire(); + return updated_gens_in_data(data); +} - // Take the lock (which may take a long time) and then check again. - std::unique_lock locker{wait_queue_lock_}; - current = current_metagen(); - FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, ": current", current, - "acquired lock"); - if (mgen != current) return; +bool topic_monitor_t::try_update_gens_maybe_becoming_reader(generation_list_t *gens) { + bool become_reader = false; + auto data = data_.acquire(); + for (;;) { + // See if the updated gen list has changed. If so we don't need to become the reader. + auto current = updated_gens_in_data(data); + FLOG(topic_monitor, "TID", thread_id(), "local mgen", metagen_for(*gens), ": current", + metagen_for(current)); + if (*gens != current) { + *gens = current; + break; + } - // Our metagen hasn't changed. Push our metagen onto the queue, then wait until we're the - // lowest. If multiple waiters are the lowest, then anyone can be the observer. - // Note the reason for picking the lowest metagen is to avoid a priority inversion where a lower - // metagen (therefore someone who should see changes) is blocked waiting for a higher metagen - // (who has already seen the changes). - wait_queue_.push(mgen); - while (wait_queue_.top() != mgen) { - FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, "releasing lock for", - wait_queue_.top()); - wait_queue_notifier_.wait(locker); + // The generations haven't changed. Perhaps we become the reader. + if (!data->has_reader) { + become_reader = true; + data->has_reader = true; + break; + } + // Not the reader, wait until the reader notifies us and loop again. + data_notifier_.wait(data.get_lock()); } - wait_queue_.pop(); + return become_reader; +} - // We now have the lowest metagen in the wait queue. Notice we still hold the lock. - // Read until the metagen changes. It may already have changed. - // Note because changes are coalesced, we can read a lot, potentially draining the pipe. - current = current_metagen(); - FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, "considering waiting for mgen", - current); - while (mgen == current) { - int fd = pipes_.read.fd(); +generation_list_t topic_monitor_t::await_gens(const generation_list_t &input_gens) { + generation_list_t gens = input_gens; + while (gens == input_gens) { + bool become_reader = try_update_gens_maybe_becoming_reader(&gens); + if (become_reader) { + // Now we are the reader. Read from the pipe, and then update with any changes. + // Note we no longer hold the lock. + assert(gens == input_gens && + "Generations should not have changed if we are the reader."); + int fd = pipes_.read.fd(); #if TOPIC_MONITOR_TSAN_WORKAROUND - // Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read() call - // until data is available (that is, fish would use 100% cpu while waiting for processes). - // The select prevents that. - fd_set fds; - FD_ZERO(&fds); - FD_SET(fd, &fds); - (void)select(fd + 1, &fds, nullptr, nullptr, nullptr /* timeout */); + // Under tsan our notifying pipe is non-blocking, so we would busy-loop on the read() + // call until data is available (that is, fish would use 100% cpu while waiting for + // processes). The select prevents that. + fd_set fds; + FD_ZERO(&fds); + FD_SET(fd, &fds); + (void)select(fd + 1, &fds, nullptr, nullptr, nullptr /* timeout */); #endif - uint8_t ignored[PIPE_BUF]; - (void)read(fd, ignored, sizeof ignored); - current = current_metagen(); - FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, - "read() complete, current mgen is", current); - } + uint8_t ignored[PIPE_BUF]; + (void)read(fd, ignored, sizeof ignored); - // Release the lock and wake up the remaining waiters. - FLOG(topic_monitor, "TID", thread_id(), "local mgen", mgen, "awakening all waiters"); - locker.unlock(); - wait_queue_notifier_.notify_all(); + // We are finished reading. We must stop being the reader, and post on the condition + // variable to wake up any other threads waiting for us to finish reading. + auto data = data_.acquire(); + gens = data->current_gens; + FLOG(topic_monitor, "TID", thread_id(), "local mgen", metagen_for(input_gens), + "read() complete, current mgen is", metagen_for(gens)); + assert(data->has_reader && "We should be the reader"); + data->has_reader = false; + data_notifier_.notify_all(); + } + } + return gens; } topic_set_t topic_monitor_t::check(generation_list_t *gens, topic_set_t topics, bool wait) { if (topics.none()) return topics; + generation_list_t current = updated_gens(); topic_set_t changed{}; for (;;) { // Load the topic list and see if anything has changed. - generation_list_t current = updated_gens(); for (topic_t topic : topic_iter_t{}) { if (topics.get(topic)) { assert(gens->at(topic) <= current.at(topic) && @@ -176,9 +192,8 @@ topic_set_t topic_monitor_t::check(generation_list_t *gens, topic_set_t topics, break; } - // Try again. Note that we use the metagen corresponding to the topic list we just - // inspected, not the current one (which may have updates since we checked). - await_metagen(metagen_for(current)); + // Wait until our gens change. + current = await_gens(current); } return changed; } diff --git a/src/topic_monitor.h b/src/topic_monitor.h index 574e72c21..4bd86f7f0 100644 --- a/src/topic_monitor.h +++ b/src/topic_monitor.h @@ -11,7 +11,6 @@ #include #include #include -#include /** Topic monitoring support. Topics are conceptually "a thing that can happen." For example, delivery of a SIGINT, a child process exits, etc. It is possible to post to a topic, which means @@ -73,47 +72,52 @@ class topic_monitor_t { static_assert(sizeof(topic_set_raw_t) * CHAR_BIT >= enum_count(), "topic_set_raw is too small"); - /// The current topic generation list, protected by a mutex. Note this may be opportunistically - /// updated at the point it is queried. - owning_lock current_gen_{{}}; + // Some stuff that needs to be protected by the same lock. + struct data_t { + /// The current generation list. + generation_list_t current_gens{}; + + /// Whether there is a thread currently reading from the notifier pipe. + bool has_reader{false}; + }; + owning_lock data_{}; + + /// Condition variable for broadcasting notifications. + /// This is associated with data_'s mutex. + std::condition_variable data_notifier_{}; /// The set of topics which have pending increments. /// This is managed via atomics. std::atomic pending_updates_{}; - /// When a topic set is queried in a blocking way, the waiters are put into a queue. The waiter - /// with the smallest metagen is responsible for announcing the change to the rest of the - /// waiters. (The metagen is just the sum of the current generations.) Note that this is a - /// max-heap that defaults to std::less; by using std::greater it becomes a min heap. This is - /// protected by wait_queue_lock_. - std::priority_queue, std::greater> - wait_queue_; - - /// Mutex guarding the wait queue. - std::mutex wait_queue_lock_{}; - - /// Condition variable for broadcasting notifications. - std::condition_variable wait_queue_notifier_{}; - - /// Pipes used to communicate changes from the signal handler. + /// Self-pipes used to communicate changes. + /// The writer is a signal handler. + /// "The reader" refers to a thread that wants to wait for changes. Only one thread can be the + /// reader at a given time. autoclose_pipes_t pipes_; - /// \return the metagen for a topic generation list. - /// The metagen is simply the sum of topic generations. Note it is monotone. - static inline generation_t metagen_for(const generation_list_t &lst) { - return std::accumulate(lst.begin(), lst.end(), generation_t{0}); - } + /// Apply any pending updates to the data. + /// This accepts data because it must be locked. + /// \return the updated generation list. + generation_list_t updated_gens_in_data(acquired_lock &data); - /// Wait for the current metagen to become different from \p gen. - /// If it is already different, return immediately. - void await_metagen(generation_t gen); + /// Given a list of input generations, attempt to update them to something newer. + /// If \p gens is older, then just return those by reference, and directly return false (not + /// becoming the reader). + /// If \p gens is current and there is not a reader, then do not update \p gens and return true, + /// indicating we should become the reader. Now it is our responsibility to read from the pipes + /// and notify on a change via the condition variable. + /// If \p gens is current, and there is already a reader, then wait until the reader notifies us + /// and try again. + bool try_update_gens_maybe_becoming_reader(generation_list_t *gens); - /// Return the current generation list, opportunistically applying any pending updates. + /// Wait for some entry in the list of generations to change. + /// \return the new gens. + generation_list_t await_gens(const generation_list_t &input_gens); + + /// \return the current generation list, opportunistically applying any pending updates. generation_list_t updated_gens(); - /// \return the metagen for the current topic generation list. - inline generation_t current_metagen() { return metagen_for(updated_gens()); } - public: topic_monitor_t(); ~topic_monitor_t();